asyncio 实际上如何工作?

2024-12-23 08:43:00
admin
原创
90
摘要:问题描述:这个问题是由我的另一个问题引发的:如何在 cdef 中等待?网络上有大量关于的文章和博客文章asyncio,但它们都很肤浅。我找不到任何关于它asyncio是如何实际实现的以及是什么让 I/O 异步的信息。我试图阅读源代码,但它有数千行不是最高级别的 C 代码,其中很多都处理辅助对象,但最重要的是,...

问题描述:

这个问题是由我的另一个问题引发的:如何在 cdef 中等待?

网络上有大量关于的文章和博客文章asyncio,但它们都很肤浅。我找不到任何关于它asyncio是如何实际实现的以及是什么让 I/O 异步的信息。我试图阅读源代码,但它有数千行不是最高级别的 C 代码,其中很多都处理辅助对象,但最重要的是,很难将 Python 语法与它将翻译成的 C 代码联系起来。

Asycnio 自己的文档就更没有帮助了。文档中没有关于它如何工作的信息,只有一些关于如何使用它的指南,而且这些指南有时还具有误导性/写得非常糟糕。

我熟悉 Go 的协程实现,并希望 Python 也能做同样的事情。如果是这样的话,我在上面链接的帖子中提出的代码应该可以工作。既然它没有工作,我现在正在试图找出原因。到目前为止,我最好的猜测如下,请纠正我哪里错了:

  1. 形式的过程定义async def foo(): ...实际上被解释为继承的类的方法coroutine

  2. 也许,async def实际上是通过语句分成多个方法await,其中调用这些方法的对象能够跟踪迄今为止执行的进度。

  3. 如果上述情况属实,那么,本质上,协程的执行归结为通过某个全局管理器(循环?)调用协程对象的方法。

  4. 全局管理器以某种方式(如何?)知道何时由 Python(仅?)代码执行 I/O 操作,并且能够在当前执行方法放弃控制(命中语句await)后选择其中一个待处理的协程方法进行执行。

换句话说,这是我尝试将某些asyncio语法“去糖化”为更易于理解的内容:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

如果我的猜测是正确的,那么我有一个问题。在这种情况下,I/O 实际上是怎样发生的?在单独的线程中?整个解释器是否被暂停,并且 I/O 发生在解释器之外?I/O 到底是什么意思?如果我的 Python 过程调用 Copen()过程,而它又向内核发送中断,放弃对它的控制,那么 Python 解释器如何知道这一点并能够继续运行其他代码,而内核代码执行实际的 I/O,直到它唤醒最初发送中断的 Python 过程?原则上,Python 解释器如何知道这种情况的发生?


解决方案 1:

asyncio 如何工作?

在回答这个问题之前,我们需要了解一些基本术语,如果你已经知道其中任何一个,请跳过这些。

生成器

生成器是允许我们暂停执行 Python 函数的对象。用户自定义的生成器使用关键字 实现yield。通过创建包含关键字的普通函数yield,我们将该函数变成生成器:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

如您所见,调用next()生成器会导致解释器加载测试的框架,并返回yield已赋值的值。next()再次调用会导致框架再次加载到解释器堆栈中,并继续yield赋值另一个值。

到第三次next()调用时,我们的生成器已完成并被StopIteration抛出。

与发电机通信

生成器的一个鲜为人知的功能是,您可以使用两种方法与它们通信:send()throw()

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

调用时gen.send(),该值作为关键字的返回值传递yield

gen.throw()另一方面,允许在生成器内部抛出异常,并在调用异常的同一位置yield引发异常。

从生成器返回值

从生成器返回一个值,会导致该值被放入StopIteration异常中。我们稍后可以从异常中恢复该值并根据需要使用它。

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

看,一个新的关键字:yield from

Python 3.4 增加了一个新关键字:yield from。该关键字允许我们将任何next()send()和传递throw()到最内层的嵌套生成器中。如果内部生成器返回一个值,它也是 的返回值yield from

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

我写了一篇文章来进一步阐述这个话题。

综合起来

在 Python 3.4 中引入 new 关键字后yield from,我们现在能够在生成器内部创建生成器,就像一条隧道一样,将数据从最内层的生成器传递到最外层的生成器。这为生成器赋予了新的含义 -协程

协程是可以在运行时停止和恢复的函数。在 Python 中,它们使用async def关键字定义。与生成器非常相似,它们也使用自己的形式,yield from即。在 Python 3.5 中引入和await之前,我们创建协程的方式与创建生成器的方式完全相同(使用而不是)。async`awaityield fromawait`

async def inner():
    return 1
    
async def outer():
    await inner()

就像所有迭代器和生成器都实现该__iter__()方法一样,所有协程都实现了__await__()允许它们在每次await coro调用时继续执行的方法。

Python 文档中有一个很好的序列图,您应该查看一下。

在 asyncio 中,除了协程函数之外,我们还有两个重要对象:任务未来

期货

Future 是已__await__()实现该方法的对象,其作用是保存特定状态和结果。状态可以是以下之一:

  1. 待定 – 未来没有任何结果或异常设置。

  2. CANCELLED-未来已被取消,使用fut.cancel()

  3. FINISHED - Future 已经完成,要么通过使用结果集fut.set_result(),要么通过使用异常集fut.set_exception()

结果就像您猜测的那样,可以是返回的 Python 对象,也可以是引发的异常。

对象的另一个重要特征future是它们包含一个名为 的方法add_done_callback()。此方法允许在任务完成后立即调用函数 - 无论是引发异常还是完成。

任务

任务对象是特殊的 Future,它环绕着协程,并与最内层和最外层的协程进行通信。每次协程await创建 Future 时,Future 都会一路传回任务(就像在 中一样yield from),然后任务会接收它。

接下来,任务将自身绑定到未来。它通过调用add_done_callback()未来来实现这一点。从现在开始,如果未来即将完成,无论是被取消、传递异常还是传递 Python 对象,都将调用任务的回调,并且它将重新存在。

异步

我们必须回答的最后一个热门问题是——IO 是如何实现的?

在 asyncio 内部,我们有一个事件循环。一个任务事件循环。事件循环的工作是每次任务就绪时调用它们,并将所有这些工作协调到一个工作机器上。

事件循环的 IO 部分基于一个名为 的关键函数select。Select 是一个阻塞函数,由底层操作系统实现,允许等待套接字接收或发送数据。收到数据后,它会被唤醒,并返回已接收数据的套接字或已准备好写入的套接字。

当您尝试通过 asyncio 在套接字上接收或发送数据时,下面实际发生的情况是,首先检查套接字是否有任何可以立即读取或发送的数据。如果其.send()缓冲区已满,或者.recv()缓冲区为空,则将套接字注册到函数select(只需将其添加到其中一个列表rlistforrecvwlistfor 中send),并将相应的函数注册await为一个新创建的future对象,该对象绑定到该套接字。

当所有可用任务都在等待 Future 时,事件循环将调用select并等待。当其中一个套接字有传入数据或其send缓冲区已耗尽时,asyncio 将检查与该套接字绑定的 Future 对象,并将其设置为完成。

现在所有魔法都发生了。未来已设置为完成,之前添加的任务add_done_callback()重新开始,并调用.send()恢复最内层协程的协程(由于链式await),然后您可以从溢出到的附近缓冲区读取新接收的数据。

再次方法链,如果发生以下情况recv()

  1. select.select等待。

  2. 返回一个已就绪的套接字,其中包含数据。

  3. 来自套接字的数据被移入缓冲区。

  4. future.set_result()被称为。

  5. 添加自身的任务add_done_callback()现已被唤醒。

  6. 任务调用.send()协程,一直进入最内层的协程并将其唤醒。

  7. 数据正在从缓冲区读取并返回给我们卑微的用户。

总之,asyncio 使用生成器功能,允许暂停和恢复函数。它使用yield from允许在最内层生成器和最外层生成器之间来回传递数据的功能。它使用所有这些功能来在等待 IO 完成(通过使用 OSselect函数)时暂停函数执行。

最好的是什么?当一个功能暂停时,另一个功能可能会运行并与 asyncio 这个精巧的结构交错。

解决方案 2:

谈论async/awaitasyncio并不是一回事。前者是一个基本的、低级的构造(协程),而后者是一个使用这些构造的库。相反,没有单一的最终答案。

以下是async/awaitasyncio类库如何工作的一般描述。也就是说,可能还有其他技巧(有……),但除非您自己构建它们,否则它们无关紧要。除非您已经知道足够多的信息而不必问这样的问题,否则差异应该可以忽略不计。

  1. 协程与子程序的比较

就像子程序(函数、过程等)一样,协程(生成器等)是调用堆栈和指令指针的抽象:有一个正在执行的代码片段的堆栈,每个代码片段都位于特定的指令上。

defversus的区别async def只是为了清晰起见。实际的区别是returnversus yield。由此,await或者yield from从单个调用到整个堆栈的区别来看。

1.1. 子程序

子程序表示一个新的堆栈级别,用于保存局部变量,并单次遍历其指令以到达结尾。考虑这样的子程序:

def subfoo(bar):
     qux = 3
     return qux * bar

当你运行它时,这意味着

  1. 分配堆栈bar空间qux

  2. 递归执行第一个语句并跳转到下一个语句

  3. 一次return,将其值推送到调用堆栈

  4. 清除堆栈(1.)和指令指针(2.)

值得注意的是,4. 表示子程序始终从同一状态开始。函数本身独有的所有内容在完成后都会丢失。函数无法恢复,即使之后有指令return

root -\n  :    - subfoo --\n  :/--<---return --/
  |
  V

1.2. 协程作为持久子程序

协程类似于子程序,但可以在不破坏其状态的情况下退出。考虑这样的协程:

 def cofoo(bar):
      qux = yield bar  # yield marks a break point
      return qux

当你运行它时,这意味着

  1. 分配堆栈bar空间qux

  2. 递归执行第一个语句并跳转到下一个语句

    1. 一次yield,将其值推送到调用堆栈,但存储堆栈和指令指针

    2. 一旦调用yield,恢复堆栈和指令指针并将参数推送到qux

  3. 一次return,将其值推送到调用堆栈

  4. 清除堆栈(1.)和指令指针(2.)

请注意添加了 2.1 和 2.2 - 协程可以在预定义点暂停和恢复。这类似于在调用另一个子程序时暂停子程序的方式。不同之处在于,活动协程并不严格绑定到其调用堆栈。相反,暂停的协程是单独的、隔离的堆栈的一部分。

root -\n  :    - cofoo --\n  :/--<+--yield --/
  |    :
  V    :

这意味着暂停的协程可以自由存储或在堆栈之间移动。任何有权访问协程的调用堆栈都可以决定恢复它。

1.3. 遍历调用堆栈

到目前为止,我们的协程仅使用 沿调用堆栈向下移动yield。子程序可以使用和 沿调用堆栈向下return和向上移动()。为了完整起见,协程还需要一种沿调用堆栈向上移动的机制。考虑这样的协程:

def wrap():
    yield 'before'
    yield from cofoo()
    yield 'after'

当你运行它时,这意味着它仍然像子程序一样分配堆栈和指令指针。当它暂停时,这仍然像存储子程序一样。

但是,会同时yield from执行。它会暂停 的堆栈和指令指针,然后运行​​。请注意 会一直暂停,直到完全完成。无论何时暂停 或发送 ,都会直接连接到调用堆栈。wrap cofoo`wrapcofoocofoo`cofoo

1.4. 协程一路向下

如上所述,yield from允许跨另一个中间范围连接两个范围。当递归应用时,这意味着堆栈的顶部可以连接到堆栈的底部。

root -\n  :    -> coro_a -yield-from-> coro_b --\n  :/ <-+------------------------yield ---/
  |    :
  : --+-- coro_a.send----------yield ---\n  :                             coro_b <-/

请注意,rootcoro_b彼此不了解。这使得协程比回调更简洁:协程仍然像子程序一样建立在 1:1 关系上。协程会暂停并恢复其整个现有执行堆栈,直到常规调用点。

值得注意的是,root可以恢复任意数量的协程。但是,它永远不能同时恢复多个协程。同一根的协程是并发的,但不是并行的!

1.5. Pythonasyncawait

到目前为止,解释已明确使用了生成器的yieldyield from词汇 - 底层功能是相同的。新的 Python3.5 语法asyncawait主要是为了清晰起见而存在。

def foo():  # subroutine?
     return None

def foo():  # coroutine?
     yield from foofoo()  # generator? coroutine?

async def foo():  # coroutine!
     await foofoo()  # coroutine!
     return None

async for需要and语句是因为如果使用裸and语句async with则会打断链条。yield from/await`for`with

  1. 简单事件循环的剖析

协程本身没有将控制权移交给另一个协程的概念。它只能将控制权移交给协程堆栈底部的调用者。然后,该调用者可以切换到另一个协程并运行它。

多个协程的根节点通常是一个事件循环:在暂停时,协程会产生一个事件,并希望在该事件上恢复。反过来,事件循环能够有效地等待这些事件的发生。这使其能够决定接下来要运行哪个协程,或者在恢复之前如何等待。

这样的设计意味着循环理解一组预定义的事件。几个协程await相互执行,直到最终生成一个事件await。该事件可以通过控制直接与事件循环通信。yield

loop -\n  :    -> coroutine --await--> event --\n  :/ <-+----------------------- yield --/
  |    :
  |    :  # loop waits for event to happen
  |    :
  : --+-- send(reply) -------- yield --\n  :        coroutine <--yield-- event <-/

关键在于协程暂停允许事件循环和事件直接通信。中间协程堆栈不需要知道哪个循环正在运行它,也不需要知道事件如何工作。

2.1.1. 时间事件

要处理的最简单的事件是到达某个时间点。这也是线程代码的基本块:线程重复sleep执行直到条件为真。但是,常规sleep程序会自行阻止执行 - 我们希望其他协程不会被阻止。相反,我们希望告诉事件循环何时应恢复当前协程堆栈。

2.1.2. 定义事件

事件只是我们可以识别的值 - 无论是通过枚举、类型还是其他标识。我们可以用一个存储目标时间的简单类来定义它。除了存储事件信息外,我们还可以直接允许await类。

class AsyncSleep:
    """Event to sleep until a point in time"""
    def __init__(self, until: float):
        self.until = until

    # used whenever someone ``await``s an instance of this Event
    def __await__(self):
        # yield this Event to the loop
        yield self
    
    def __repr__(self):
        return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

该类仅存储事件 - 它并没有说明如何实际处理它。

唯一的特殊之处是__await__- 它就是await关键字所寻找的。实际上,它是一个迭代器,但不适用于常规迭代机制。

2.2.1. 等待事件

现在我们有了事件,那么协程如何对它做出反应呢?我们应该能够sleep通过awaiting 我们的事件来表达等效内容。为了更好地了解正在发生的事情,我们等待了一半的时间:

import time

async def asleep(duration: float):
    """await that ``duration`` seconds pass"""
    await AsyncSleep(time.time() + duration / 2)
    await AsyncSleep(time.time() + duration / 2)

我们可以直接实例化并运行此协程。与生成器类似,使用coroutine.send运行协程直到yield得到结果。

coroutine = asleep(100)
while True:
    print(coroutine.send(None))
    time.sleep(0.1)

这为我们提供了两个AsyncSleep事件,然后StopIteration是协程完成时的一个事件。请注意,唯一的延迟来自time.sleep循环中!每个事件AsyncSleep仅存储与当前时间的偏移量。

2.2.2. 事件 + 睡眠

目前,我们有两种独立的机制可供使用:

  • AsyncSleep可以从协程内部产生的事件

  • time.sleep可以等待而不影响协程

值得注意的是,这两者是正交的:任何一个都不会影响或触发另一个。因此,我们可以想出自己的策略来sleep应对延迟AsyncSleep

2.3. 简单的事件循环

如果我们有多个协程,每个协程都可以告诉我们何时需要唤醒。然后,我们可以等到第一个协程需要恢复,然后再等下一个,依此类推。值得注意的是,在每个点上,我们只关心下一个是哪一个

这使得调度变得简单:

  1. 根据期望唤醒时间对协程进行排序

  2. 选择第一个想醒来的人

  3. 等到这个时间点

  4. 运行该协程

  5. 从 1 重复。

一个简单的实现不需要任何高级概念。Alist允许按日期对协程进行排序。等待是常规的time.sleep。运行协程的工作方式与之前一样coroutine.send

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    # store wake-up-time and coroutines
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting:
        # 2. pick the first coroutine that wants to wake up
        until, coroutine = waiting.pop(0)
        # 3. wait until this point in time
        time.sleep(max(0.0, until - time.time()))
        # 4. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])

当然,这还有很大的改进空间。我们可以使用堆作为等待队列,或使用分派表来处理事件。我们还可以从中获取返回值StopIteration并将它们分配给协程。但是,基本原理保持不变。

2.4. 合作等待

事件AsyncSleeprun事件循环是定时事件的完整工作实现。

async def sleepy(identifier: str = "coroutine", count=5):
    for i in range(count):
        print(identifier, 'step', i + 1, 'at %.2f' % time.time())
        await asleep(0.1)

run(*(sleepy("coroutine %d" % j) for j in range(5)))

这会在五个协程之间协作切换,每个协程暂停 0.1 秒。尽管事件循环是同步的,但它仍会在 0.5 秒内(而不是 2.5 秒)执行工作。每个协程都保持状态并独立运行。

  1. I/O 事件循环

支持 的事件循环sleep适合轮询。但是,等待文件句柄上的 I/O 可以更高效地完成:操作系统实现 I/O,因此知道哪些句柄已准备就绪。理想情况下,事件循环应支持显式的“I/O 就绪”事件。

3.1.select调用

Python 已经有一个接口可以向操作系统查询读取 I/O 句柄。当使用要读取或写入的句柄调用时,它会返回准备好读取或写入的句柄:

readable, writable, _ = select.select(rlist, wlist, xlist, timeout)

例如,我们可以open写入一个文件并等待它准备就绪:

write_target = open('/tmp/foo')
readable, writable, _ = select.select([], [write_target], [])

一旦选择返回,writable就包含我们打开的文件。

3.2. 基本 I/O 事件

与请求类似AsyncSleep,我们需要为 I/O 定义一个事件。根据底层select逻辑,事件必须引用可读对象 - 比如open文件。此外,我们还要存储要读取的数据量。

class AsyncRead:
    def __init__(self, file, amount=1):
        self.file = file
        self.amount = amount
        self._buffer = b'' if 'b' in file.mode else ''

    def __await__(self):
        while len(self._buffer) < self.amount:
            yield self
            # we only get here if ``read`` should not block
            self._buffer += self.file.read(1)
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.file, self.amount, len(self._buffer)
        )

与我们大多数情况一样,AsyncSleep我们只是存储底层系统调用所需的数据。这一次,__await__能够多次恢复 - 直到我们想要的amount被读取。此外,我们还会return存储 I/O 结果,而不仅仅是恢复。

3.3. 使用读取 I/O 增强事件循环

我们的事件循环的基础仍然是run之前定义的。首先,我们需要跟踪读取请求。这不再是一个有序的调度,我们只将读取请求映射到协程。

# new
waiting_read = {}  # type: Dict[file, coroutine]

由于select.select采用超时参数,我们可以用它代替time.sleep

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(waiting_read), [], [])

这将为我们返回所有可读文件 - 如果有,我们将运行相应的协程。如果没有,则表示我们已经等待当前协程运行足够长的时间。

# new - reschedule waiting coroutine, run readable coroutine
if readable:
    waiting.append((until, coroutine))
    waiting.sort()
    coroutine = waiting_read[readable[0]]

最后,我们必须实际监听读取请求。

# new
if isinstance(command, AsyncSleep):
    ...
elif isinstance(command, AsyncRead):
    ...

3.4. 整合

以上内容有些简化。如果我们总是可以读取,我们需要进行一些切换,以免让休眠的协程挨饿。我们需要处理没有东西可读取或没有东西可等待的情况。然而,最终结果仍然符合 30 LOC 的要求。

def run(*coroutines):
    """Cooperatively run all ``coroutines`` until completion"""
    waiting_read = {}  # type: Dict[file, coroutine]
    waiting = [(0, coroutine) for coroutine in coroutines]
    while waiting or waiting_read:
        # 2. wait until the next coroutine may run or read ...
        try:
            until, coroutine = waiting.pop(0)
        except IndexError:
            until, coroutine = float('inf'), None
            readable, _, _ = select.select(list(waiting_read), [], [])
        else:
            readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
        # ... and select the appropriate one
        if readable and time.time() < until:
            if until and coroutine:
                waiting.append((until, coroutine))
                waiting.sort()
            coroutine = waiting_read.pop(readable[0])
        # 3. run this coroutine
        try:
            command = coroutine.send(None)
        except StopIteration:
            continue
        # 1. sort coroutines by their desired suspension ...
        if isinstance(command, AsyncSleep):
            waiting.append((command.until, coroutine))
            waiting.sort(key=lambda item: item[0])
        # ... or register reads
        elif isinstance(command, AsyncRead):
            waiting_read[command.file] = coroutine

3.5. 协作 I/O

AsyncSleepAsyncRead的实现run现在已完全可以用于休眠和/或读取。与 一样sleepy,我们可以定义一个辅助程序来测试读取:

async def ready(path, amount=1024*32):
    print('read', path, 'at', '%d' % time.time())
    with open(path, 'rb') as file:
        result = await AsyncRead(file, amount)
    print('done', path, 'at', '%d' % time.time())
    print('got', len(result), 'B')

run(sleepy('background', 5), ready('/dev/urandom'))

运行此程序,我们可以看到我们的 I/O 与等待任务交叉:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4.非阻塞I/O

虽然文件 I/O 传达了这一概念,但它并不真正适合这样的库asyncio:文件的select调用总是返回,并且open和都read可能无限期阻塞。这会阻止事件循环的所有协程 - 这是不好的。像这样的库aiofiles使用线程和同步来伪造文件上的非阻塞 I/O 和事件。

但是,套接字确实允许非阻塞 I/O - 并且其固有的延迟使其变得更加关键。当在事件循环中使用时,等待数据和重试可以被包装而不会阻塞任何东西。

4.1. 非阻塞 I/O 事件

与我们的 类似AsyncRead,我们可以为套接字定义一个暂停和读取事件。我们采用套接字(必须是非阻塞的)而不是文件。此外,我们__await__使用socket.recv而不是file.read

class AsyncRecv:
    def __init__(self, connection, amount=1, read_buffer=1024):
        assert not connection.getblocking(), 'connection must be non-blocking for async recv'
        self.connection = connection
        self.amount = amount
        self.read_buffer = read_buffer
        self._buffer = b''

    def __await__(self):
        while len(self._buffer) < self.amount:
            try:
                self._buffer += self.connection.recv(self.read_buffer)
            except BlockingIOError:
                yield self
        return self._buffer

    def __repr__(self):
        return '%s(file=%s, amount=%d, progress=%d)' % (
            self.__class__.__name__, self.connection, self.amount, len(self._buffer)
        )

与 相比AsyncRead__await__执行真正的非阻塞 I/O。当数据可用时,它始终读取。当没有数据可用时,它始终暂停。这意味着事件循环仅在我们执行有用的工作时被阻塞。

4.2. 解除事件循环阻塞

就事件循环而言,没有什么太大的变化。要监听的事件仍然与文件相同 - 标记为就绪的文件描述符select

# old
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
    waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
    waiting_read[command.connection] = coroutine

此时,应该很明显AsyncReadAsyncRecv是同一种事件。我们可以轻松地将它们重构为具有可交换 I/O 组件的一个事件。实际上,事件循环、协程和事件将调度程序、任意中间代码和实际 I/O清晰地分开。

4.3. 非阻塞 I/O 的丑陋一面

原则上,此时你应该做的是复制 for 的逻辑readrecv但是AsyncRecv,现在这更丑陋了——当函数在内核中阻塞时,你必须处理早期返回,但将控制权交给你。例如,打开连接比打开文件要长得多:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
    connection.connect((url, port))
except BlockingIOError:
    pass

长话短说,剩下的就是几十行异常处理。此时事件和事件循环已经可以正常工作了。

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

附录

github 上的示例代码

解决方案 3:

您的coro脱糖在概念上是正确的,但略有不完整。

await不会无条件暂停,但只有在遇到阻塞调用时才会暂停。它如何知道调用正在阻塞?这取决于正在等待的代码。例如,套接字读取的可等待实现可以简化为:

def read(sock, n):
    # sock must be in non-blocking mode
    try:
        return sock.recv(n)
    except EWOULDBLOCK:
        event_loop.add_reader(sock.fileno, current_task())
        return SUSPEND

在真正的 asyncio 中,等效代码会修改 a 的状态Future,而不是返回魔法值,但概念是相同的。当适当地适应类似生成器的对象时,上述代码可以进行await编辑。

在调用方,当你的协程包含:

data = await read(sock, 1024)

它将糖脱掉,变成类似这样的形式:

data = read(sock, 1024)
if data is SUSPEND:
    return SUSPEND
self.pos += 1
self.parts[self.pos](...)

熟悉发电机的人往往会用yield from自动暂停的方式来描述上述情况。

暂停链一直延续到事件循环,事件循环会注意到协程已暂停,将其从可运行集合中移除,然后继续执行可运行的协程(如果有)。如果没有可运行的协程,则循环将一直等待,select()直到协程感兴趣的文件描述符准备好进行 IO 或超时到期。(事件循环维护文件描述符到协程的映射。)

在上面的例子中,一旦select()告诉事件循环sock可读,它就会重新添加coro到可运行集合中,因此它会从暂停点继续执行。

换句话说:

  1. 默认情况下所有事情都发生在同一个线程中。

  2. 事件循环负责调度协程,并在它们等待的内容(通常是阻塞的 IO 调用或超时)准备就绪时唤醒它们。

为了深入了解协程驱动事件循环,我推荐Dave Beazley 的这次演讲,他在现场观众面前演示了如何从头开始编写事件循环代码。

解决方案 4:

这一切可以归结为 asyncio 正在解决的两个主要挑战:

  • 如何在单个线程中执行多个 I/O?

  • 如何实现协作多任务?

第一点的答案已经存在很长一段时间了,被称为select 循环。在 python 中,它是在selectors 模块中实现的。

第二个问题与协程的概念有关,即可以停止执行并在稍后恢复的函数。在 Python 中,协程是使用生成器和Yiil from语句实现的。这就是async/await 语法背后的原理。

更多资源请参阅此答案。


编辑:解决你关于 goroutines 的评论:

在 asyncio 中,与 goroutine 最接近的其实不是协程,而是任务(请参阅文档中的区别)。在 Python 中,协程(或生成器)对事件循环或 I/O 的概念一无所知。它只是一个函数,可以使用来停止执行,yield同时保持其当前状态,以便稍后恢复。yield from语法允许以透明的方式链接它们。

现在,在 asyncio 任务中,链最底层的协程总是会产生一个Future。然后,这个 Future 会冒泡到事件循环,并集成到内部机制中。当 Future 被其他内部回调设置为完成时,事件循环可以通过将 Future 发送回协程链来恢复任务。


编辑:解决您帖子中的一些问题:

在这种情况下,I/O 实际上是怎样发生的?在单独的线程中?整个解释器是否被暂停,并且 I/O 是否在解释器之外发生?

不会,线程中什么都不会发生。I/O 始终由事件循环管理,主要是通过文件描述符。但是这些文件描述符的注册通常被高级协程隐藏,从而让您做​​些脏活。

I/O 到底是什么意思?如果我的 Python 程序调用 C open() 程序,而它又向内核发送中断,将控制权交给内核,那么 Python 解释器如何知道这一点并能够继续运行其他代码,而内核代码执行实际的 I/O,直到它唤醒最初发送中断的 Python 程序?原则上,Python 解释器如何知道这种情况的发生?

I/O 是任何阻塞调用。在 asyncio 中,所有 I/O 操作都应通过事件循环,因为正如您所说,事件循环无法知道某些同步代码中正在执行阻塞调用。这意味着您不应该open在协程上下文中使用同步。相反,请使用专用库,例如​​提供异步版本的aiofilesopen

解决方案 5:

如果你想象一个机场控制塔,许多飞机等待降落在同一条跑道上。控制塔可以看作是事件循环,跑道是线程。每架飞机都是一个等待执行的单独函数。实际上,一次只能有一架飞机降落在跑道上。asyncio 基本上是通过使用事件循环暂停函数并允许其他函数运行,允许多架飞机同时降落在同一条跑道上。当你使用 await 语法时,它基本上意味着可以暂停飞机(函数并允许其他函数处理

解决方案 6:

它允许您编写单线程异步代码并在 Python 中实现并发。基本上,asyncio为异步编程提供事件循环。例如,如果我们需要在不阻塞主线程的情况下发出请求,我们可以使用该asyncio库。

asyncio 模块允许使用以下元素的组合来实现异步编程:

  • 事件循环:asyncio 模块允许每个进程有一个事件循环。

  • 协程:协程是一种遵循某些约定的生成器。其最有趣的特性是,它可以在执行期间暂停以等待外部处理(I/O 中的某个例程),并在外部处理完成后从停止点返回。

  • Futures:Futures 表示尚未完成的进程。Future 是一个应该在未来产生结果的对象,表示未完成的任务。

  • Tasks:这是asyncio.Future 的一个子类,用于封装和管理协程。我们可以使用 asyncio.Task 对象来封装协程。

其中最重要的概念asyncio是事件循环。事件循环允许您使用回调或协程编写异步代码。理解的关键asyncio是协程和事件循环的术语。协程是有状态的函数,在执行另一个 I/O 操作时可以停止执行。事件循环用于协调协程的执行。

要运行任何协程函数,我们需要获取事件循环。我们可以使用以下代码来实现

    loop = asyncio.get_event_loop()

这为我们提供了一个BaseEventLoop对象。它有一个run_until_complete方法,该方法接受一个协程并运行它直到完成。然后,协程返回一个结果。在低级别上,事件循环执行该BaseEventLoop.rununtilcomplete(future)方法。

相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1120  
  IPD(Integrated Product Development,集成产品开发)流程是一种广泛应用于高科技和制造业的产品开发方法论。它通过跨职能团队的紧密协作,将产品开发周期缩短,同时提高产品质量和市场成功率。在IPD流程中,CDCP(Concept Decision Checkpoint,概念决策检查点)是一个关...
IPD培训课程   75  
  研发IPD(集成产品开发)流程作为一种系统化的产品开发方法,已经在许多行业中得到广泛应用。它不仅能够提升产品开发的效率和质量,还能够通过优化流程和资源分配,显著提高客户满意度。客户满意度是企业长期成功的关键因素之一,而IPD流程通过其独特的结构和机制,能够确保产品从概念到市场交付的每个环节都围绕客户需求展开。本文将深入...
IPD流程   66  
  IPD(Integrated Product Development,集成产品开发)流程是一种以跨职能团队协作为核心的产品开发方法,旨在通过优化资源分配、提高沟通效率以及减少返工,从而缩短项目周期并提升产品质量。随着企业对产品上市速度的要求越来越高,IPD流程的应用价值愈发凸显。通过整合产品开发过程中的各个环节,IPD...
IPD项目管理咨询   76  
  跨部门沟通是企业运营中不可或缺的一环,尤其在复杂的产品开发过程中,不同部门之间的协作效率直接影响项目的成败。集成产品开发(IPD)作为一种系统化的项目管理方法,旨在通过优化流程和增强团队协作来提升产品开发的效率和质量。然而,跨部门沟通的复杂性往往成为IPD实施中的一大挑战。部门之间的目标差异、信息不对称以及沟通渠道不畅...
IPD是什么意思   70  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用