asyncio 实际上如何工作?
- 2024-12-23 08:43:00
- admin 原创
- 91
问题描述:
这个问题是由我的另一个问题引发的:如何在 cdef 中等待?
网络上有大量关于的文章和博客文章asyncio
,但它们都很肤浅。我找不到任何关于它asyncio
是如何实际实现的以及是什么让 I/O 异步的信息。我试图阅读源代码,但它有数千行不是最高级别的 C 代码,其中很多都处理辅助对象,但最重要的是,很难将 Python 语法与它将翻译成的 C 代码联系起来。
Asycnio 自己的文档就更没有帮助了。文档中没有关于它如何工作的信息,只有一些关于如何使用它的指南,而且这些指南有时还具有误导性/写得非常糟糕。
我熟悉 Go 的协程实现,并希望 Python 也能做同样的事情。如果是这样的话,我在上面链接的帖子中提出的代码应该可以工作。既然它没有工作,我现在正在试图找出原因。到目前为止,我最好的猜测如下,请纠正我哪里错了:
形式的过程定义
async def foo(): ...
实际上被解释为继承的类的方法coroutine
。也许,
async def
实际上是通过语句分成多个方法await
,其中调用这些方法的对象能够跟踪迄今为止执行的进度。如果上述情况属实,那么,本质上,协程的执行归结为通过某个全局管理器(循环?)调用协程对象的方法。
全局管理器以某种方式(如何?)知道何时由 Python(仅?)代码执行 I/O 操作,并且能够在当前执行方法放弃控制(命中语句
await
)后选择其中一个待处理的协程方法进行执行。
换句话说,这是我尝试将某些asyncio
语法“去糖化”为更易于理解的内容:
async def coro(name):
print('before', name)
await asyncio.sleep()
print('after', name)
asyncio.gather(coro('first'), coro('second'))
# translated from async def coro(name)
class Coro(coroutine):
def before(self, name):
print('before', name)
def after(self, name):
print('after', name)
def __init__(self, name):
self.name = name
self.parts = self.before, self.after
self.pos = 0
def __call__():
self.parts[self.pos](self.name)
self.pos += 1
def done(self):
return self.pos == len(self.parts)
# translated from asyncio.gather()
class AsyncIOManager:
def gather(*coros):
while not every(c.done() for c in coros):
coro = random.choice(coros)
coro()
如果我的猜测是正确的,那么我有一个问题。在这种情况下,I/O 实际上是怎样发生的?在单独的线程中?整个解释器是否被暂停,并且 I/O 发生在解释器之外?I/O 到底是什么意思?如果我的 Python 过程调用 Copen()
过程,而它又向内核发送中断,放弃对它的控制,那么 Python 解释器如何知道这一点并能够继续运行其他代码,而内核代码执行实际的 I/O,直到它唤醒最初发送中断的 Python 过程?原则上,Python 解释器如何知道这种情况的发生?
解决方案 1:
asyncio 如何工作?
在回答这个问题之前,我们需要了解一些基本术语,如果你已经知道其中任何一个,请跳过这些。
生成器
生成器是允许我们暂停执行 Python 函数的对象。用户自定义的生成器使用关键字 实现yield
。通过创建包含关键字的普通函数yield
,我们将该函数变成生成器:
>>> def test():
... yield 1
... yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
如您所见,调用next()
生成器会导致解释器加载测试的框架,并返回yield
已赋值的值。next()
再次调用会导致框架再次加载到解释器堆栈中,并继续yield
赋值另一个值。
到第三次next()
调用时,我们的生成器已完成并被StopIteration
抛出。
与发电机通信
生成器的一个鲜为人知的功能是,您可以使用两种方法与它们通信:send()
和throw()
。
>>> def test():
... val = yield 1
... print(val)
... yield 2
... yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in test
Exception
调用时gen.send()
,该值作为关键字的返回值传递yield
。
gen.throw()
另一方面,允许在生成器内部抛出异常,并在调用异常的同一位置yield
引发异常。
从生成器返回值
从生成器返回一个值,会导致该值被放入StopIteration
异常中。我们稍后可以从异常中恢复该值并根据需要使用它。
>>> def test():
... yield 1
... return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
... next(gen)
... except StopIteration as exc:
... print(exc.value)
...
abc
看,一个新的关键字:yield from
Python 3.4 增加了一个新关键字:yield from
。该关键字允许我们将任何next()
、send()
和传递throw()
到最内层的嵌套生成器中。如果内部生成器返回一个值,它也是 的返回值yield from
:
>>> def inner():
... inner_result = yield 2
... print('inner', inner_result)
... return 3
...
>>> def outer():
... yield 1
... val = yield from inner()
... print('outer', val)
... yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4
我写了一篇文章来进一步阐述这个话题。
综合起来
在 Python 3.4 中引入 new 关键字后yield from
,我们现在能够在生成器内部创建生成器,就像一条隧道一样,将数据从最内层的生成器传递到最外层的生成器。这为生成器赋予了新的含义 -协程。
协程是可以在运行时停止和恢复的函数。在 Python 中,它们使用async def
关键字定义。与生成器非常相似,它们也使用自己的形式,yield from
即。在 Python 3.5 中引入和await
之前,我们创建协程的方式与创建生成器的方式完全相同(使用而不是)。async
`awaityield from
await`
async def inner():
return 1
async def outer():
await inner()
就像所有迭代器和生成器都实现该__iter__()
方法一样,所有协程都实现了__await__()
允许它们在每次await coro
调用时继续执行的方法。
Python 文档中有一个很好的序列图,您应该查看一下。
在 asyncio 中,除了协程函数之外,我们还有两个重要对象:任务和未来。
期货
Future 是已__await__()
实现该方法的对象,其作用是保存特定状态和结果。状态可以是以下之一:
待定 – 未来没有任何结果或异常设置。
CANCELLED-未来已被取消,使用
fut.cancel()
FINISHED - Future 已经完成,要么通过使用结果集
fut.set_result()
,要么通过使用异常集fut.set_exception()
结果就像您猜测的那样,可以是返回的 Python 对象,也可以是引发的异常。
对象的另一个重要特征future
是它们包含一个名为 的方法add_done_callback()
。此方法允许在任务完成后立即调用函数 - 无论是引发异常还是完成。
任务
任务对象是特殊的 Future,它环绕着协程,并与最内层和最外层的协程进行通信。每次协程await
创建 Future 时,Future 都会一路传回任务(就像在 中一样yield from
),然后任务会接收它。
接下来,任务将自身绑定到未来。它通过调用add_done_callback()
未来来实现这一点。从现在开始,如果未来即将完成,无论是被取消、传递异常还是传递 Python 对象,都将调用任务的回调,并且它将重新存在。
异步
我们必须回答的最后一个热门问题是——IO 是如何实现的?
在 asyncio 内部,我们有一个事件循环。一个任务事件循环。事件循环的工作是每次任务就绪时调用它们,并将所有这些工作协调到一个工作机器上。
事件循环的 IO 部分基于一个名为 的关键函数select
。Select 是一个阻塞函数,由底层操作系统实现,允许等待套接字接收或发送数据。收到数据后,它会被唤醒,并返回已接收数据的套接字或已准备好写入的套接字。
当您尝试通过 asyncio 在套接字上接收或发送数据时,下面实际发生的情况是,首先检查套接字是否有任何可以立即读取或发送的数据。如果其.send()
缓冲区已满,或者.recv()
缓冲区为空,则将套接字注册到函数select
(只需将其添加到其中一个列表rlist
forrecv
和wlist
for 中send
),并将相应的函数注册await
为一个新创建的future
对象,该对象绑定到该套接字。
当所有可用任务都在等待 Future 时,事件循环将调用select
并等待。当其中一个套接字有传入数据或其send
缓冲区已耗尽时,asyncio 将检查与该套接字绑定的 Future 对象,并将其设置为完成。
现在所有魔法都发生了。未来已设置为完成,之前添加的任务add_done_callback()
重新开始,并调用.send()
恢复最内层协程的协程(由于链式await
),然后您可以从溢出到的附近缓冲区读取新接收的数据。
再次方法链,如果发生以下情况recv()
:
select.select
等待。返回一个已就绪的套接字,其中包含数据。
来自套接字的数据被移入缓冲区。
future.set_result()
被称为。添加自身的任务
add_done_callback()
现已被唤醒。任务调用
.send()
协程,一直进入最内层的协程并将其唤醒。数据正在从缓冲区读取并返回给我们卑微的用户。
总之,asyncio 使用生成器功能,允许暂停和恢复函数。它使用yield from
允许在最内层生成器和最外层生成器之间来回传递数据的功能。它使用所有这些功能来在等待 IO 完成(通过使用 OSselect
函数)时暂停函数执行。
最好的是什么?当一个功能暂停时,另一个功能可能会运行并与 asyncio 这个精巧的结构交错。
解决方案 2:
谈论async/await
和asyncio
并不是一回事。前者是一个基本的、低级的构造(协程),而后者是一个使用这些构造的库。相反,没有单一的最终答案。
以下是async/await
和asyncio
类库如何工作的一般描述。也就是说,可能还有其他技巧(有……),但除非您自己构建它们,否则它们无关紧要。除非您已经知道足够多的信息而不必问这样的问题,否则差异应该可以忽略不计。
协程与子程序的比较
就像子程序(函数、过程等)一样,协程(生成器等)是调用堆栈和指令指针的抽象:有一个正在执行的代码片段的堆栈,每个代码片段都位于特定的指令上。
def
versus的区别async def
只是为了清晰起见。实际的区别是return
versus yield
。由此,await
或者yield from
从单个调用到整个堆栈的区别来看。
1.1. 子程序
子程序表示一个新的堆栈级别,用于保存局部变量,并单次遍历其指令以到达结尾。考虑这样的子程序:
def subfoo(bar):
qux = 3
return qux * bar
当你运行它时,这意味着
分配堆栈
bar
空间qux
递归执行第一个语句并跳转到下一个语句
一次
return
,将其值推送到调用堆栈清除堆栈(1.)和指令指针(2.)
值得注意的是,4. 表示子程序始终从同一状态开始。函数本身独有的所有内容在完成后都会丢失。函数无法恢复,即使之后有指令return
。
root -\n : - subfoo --\n :/--<---return --/
|
V
1.2. 协程作为持久子程序
协程类似于子程序,但可以在不破坏其状态的情况下退出。考虑这样的协程:
def cofoo(bar):
qux = yield bar # yield marks a break point
return qux
当你运行它时,这意味着
分配堆栈
bar
空间qux
递归执行第一个语句并跳转到下一个语句
一次
yield
,将其值推送到调用堆栈,但存储堆栈和指令指针一旦调用
yield
,恢复堆栈和指令指针并将参数推送到qux
一次
return
,将其值推送到调用堆栈清除堆栈(1.)和指令指针(2.)
请注意添加了 2.1 和 2.2 - 协程可以在预定义点暂停和恢复。这类似于在调用另一个子程序时暂停子程序的方式。不同之处在于,活动协程并不严格绑定到其调用堆栈。相反,暂停的协程是单独的、隔离的堆栈的一部分。
root -\n : - cofoo --\n :/--<+--yield --/
| :
V :
这意味着暂停的协程可以自由存储或在堆栈之间移动。任何有权访问协程的调用堆栈都可以决定恢复它。
1.3. 遍历调用堆栈
到目前为止,我们的协程仅使用 沿调用堆栈向下移动yield
。子程序可以使用和 沿调用堆栈向下return
和向上移动()
。为了完整起见,协程还需要一种沿调用堆栈向上移动的机制。考虑这样的协程:
def wrap():
yield 'before'
yield from cofoo()
yield 'after'
当你运行它时,这意味着它仍然像子程序一样分配堆栈和指令指针。当它暂停时,这仍然像存储子程序一样。
但是,会同时yield from
执行。它会暂停 的堆栈和指令指针,然后运行。请注意 会一直暂停,直到完全完成。无论何时暂停 或发送 ,都会直接连接到调用堆栈。wrap
cofoo
`wrapcofoo
cofoo`cofoo
1.4. 协程一路向下
如上所述,yield from
允许跨另一个中间范围连接两个范围。当递归应用时,这意味着堆栈的顶部可以连接到堆栈的底部。
root -\n : -> coro_a -yield-from-> coro_b --\n :/ <-+------------------------yield ---/
| :
: --+-- coro_a.send----------yield ---\n : coro_b <-/
请注意,root
和coro_b
彼此不了解。这使得协程比回调更简洁:协程仍然像子程序一样建立在 1:1 关系上。协程会暂停并恢复其整个现有执行堆栈,直到常规调用点。
值得注意的是,root
可以恢复任意数量的协程。但是,它永远不能同时恢复多个协程。同一根的协程是并发的,但不是并行的!
1.5. Pythonasync
和await
到目前为止,解释已明确使用了生成器的yield
和yield from
词汇 - 底层功能是相同的。新的 Python3.5 语法async
和await
主要是为了清晰起见而存在。
def foo(): # subroutine?
return None
def foo(): # coroutine?
yield from foofoo() # generator? coroutine?
async def foo(): # coroutine!
await foofoo() # coroutine!
return None
async for
需要and语句是因为如果使用裸and语句async with
则会打断链条。yield from/await
`for`with
简单事件循环的剖析
协程本身没有将控制权移交给另一个协程的概念。它只能将控制权移交给协程堆栈底部的调用者。然后,该调用者可以切换到另一个协程并运行它。
多个协程的根节点通常是一个事件循环:在暂停时,协程会产生一个事件,并希望在该事件上恢复。反过来,事件循环能够有效地等待这些事件的发生。这使其能够决定接下来要运行哪个协程,或者在恢复之前如何等待。
这样的设计意味着循环理解一组预定义的事件。几个协程await
相互执行,直到最终生成一个事件await
。该事件可以通过控制直接与事件循环通信。yield
loop -\n : -> coroutine --await--> event --\n :/ <-+----------------------- yield --/
| :
| : # loop waits for event to happen
| :
: --+-- send(reply) -------- yield --\n : coroutine <--yield-- event <-/
关键在于协程暂停允许事件循环和事件直接通信。中间协程堆栈不需要知道哪个循环正在运行它,也不需要知道事件如何工作。
2.1.1. 时间事件
要处理的最简单的事件是到达某个时间点。这也是线程代码的基本块:线程重复sleep
执行直到条件为真。但是,常规sleep
程序会自行阻止执行 - 我们希望其他协程不会被阻止。相反,我们希望告诉事件循环何时应恢复当前协程堆栈。
2.1.2. 定义事件
事件只是我们可以识别的值 - 无论是通过枚举、类型还是其他标识。我们可以用一个存储目标时间的简单类来定义它。除了存储事件信息外,我们还可以直接允许await
类。
class AsyncSleep:
"""Event to sleep until a point in time"""
def __init__(self, until: float):
self.until = until
# used whenever someone ``await``s an instance of this Event
def __await__(self):
# yield this Event to the loop
yield self
def __repr__(self):
return '%s(until=%.1f)' % (self.__class__.__name__, self.until)
该类仅存储事件 - 它并没有说明如何实际处理它。
唯一的特殊之处是__await__
- 它就是await
关键字所寻找的。实际上,它是一个迭代器,但不适用于常规迭代机制。
2.2.1. 等待事件
现在我们有了事件,那么协程如何对它做出反应呢?我们应该能够sleep
通过await
ing 我们的事件来表达等效内容。为了更好地了解正在发生的事情,我们等待了一半的时间:
import time
async def asleep(duration: float):
"""await that ``duration`` seconds pass"""
await AsyncSleep(time.time() + duration / 2)
await AsyncSleep(time.time() + duration / 2)
我们可以直接实例化并运行此协程。与生成器类似,使用coroutine.send
运行协程直到yield
得到结果。
coroutine = asleep(100)
while True:
print(coroutine.send(None))
time.sleep(0.1)
这为我们提供了两个AsyncSleep
事件,然后StopIteration
是协程完成时的一个事件。请注意,唯一的延迟来自time.sleep
循环中!每个事件AsyncSleep
仅存储与当前时间的偏移量。
2.2.2. 事件 + 睡眠
目前,我们有两种独立的机制可供使用:
AsyncSleep
可以从协程内部产生的事件time.sleep
可以等待而不影响协程
值得注意的是,这两者是正交的:任何一个都不会影响或触发另一个。因此,我们可以想出自己的策略来sleep
应对延迟AsyncSleep
。
2.3. 简单的事件循环
如果我们有多个协程,每个协程都可以告诉我们何时需要唤醒。然后,我们可以等到第一个协程需要恢复,然后再等下一个,依此类推。值得注意的是,在每个点上,我们只关心下一个是哪一个。
这使得调度变得简单:
根据期望唤醒时间对协程进行排序
选择第一个想醒来的人
等到这个时间点
运行该协程
从 1 重复。
一个简单的实现不需要任何高级概念。Alist
允许按日期对协程进行排序。等待是常规的time.sleep
。运行协程的工作方式与之前一样coroutine.send
。
def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
# store wake-up-time and coroutines
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting:
# 2. pick the first coroutine that wants to wake up
until, coroutine = waiting.pop(0)
# 3. wait until this point in time
time.sleep(max(0.0, until - time.time()))
# 4. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
当然,这还有很大的改进空间。我们可以使用堆作为等待队列,或使用分派表来处理事件。我们还可以从中获取返回值StopIteration
并将它们分配给协程。但是,基本原理保持不变。
2.4. 合作等待
事件AsyncSleep
和run
事件循环是定时事件的完整工作实现。
async def sleepy(identifier: str = "coroutine", count=5):
for i in range(count):
print(identifier, 'step', i + 1, 'at %.2f' % time.time())
await asleep(0.1)
run(*(sleepy("coroutine %d" % j) for j in range(5)))
这会在五个协程之间协作切换,每个协程暂停 0.1 秒。尽管事件循环是同步的,但它仍会在 0.5 秒内(而不是 2.5 秒)执行工作。每个协程都保持状态并独立运行。
I/O 事件循环
支持 的事件循环sleep
适合轮询。但是,等待文件句柄上的 I/O 可以更高效地完成:操作系统实现 I/O,因此知道哪些句柄已准备就绪。理想情况下,事件循环应支持显式的“I/O 就绪”事件。
3.1.select
调用
Python 已经有一个接口可以向操作系统查询读取 I/O 句柄。当使用要读取或写入的句柄调用时,它会返回准备好读取或写入的句柄:
readable, writable, _ = select.select(rlist, wlist, xlist, timeout)
例如,我们可以open
写入一个文件并等待它准备就绪:
write_target = open('/tmp/foo')
readable, writable, _ = select.select([], [write_target], [])
一旦选择返回,writable
就包含我们打开的文件。
3.2. 基本 I/O 事件
与请求类似AsyncSleep
,我们需要为 I/O 定义一个事件。根据底层select
逻辑,事件必须引用可读对象 - 比如open
文件。此外,我们还要存储要读取的数据量。
class AsyncRead:
def __init__(self, file, amount=1):
self.file = file
self.amount = amount
self._buffer = b'' if 'b' in file.mode else ''
def __await__(self):
while len(self._buffer) < self.amount:
yield self
# we only get here if ``read`` should not block
self._buffer += self.file.read(1)
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.file, self.amount, len(self._buffer)
)
与我们大多数情况一样,AsyncSleep
我们只是存储底层系统调用所需的数据。这一次,__await__
能够多次恢复 - 直到我们想要的amount
被读取。此外,我们还会return
存储 I/O 结果,而不仅仅是恢复。
3.3. 使用读取 I/O 增强事件循环
我们的事件循环的基础仍然是run
之前定义的。首先,我们需要跟踪读取请求。这不再是一个有序的调度,我们只将读取请求映射到协程。
# new
waiting_read = {} # type: Dict[file, coroutine]
由于select.select
采用超时参数,我们可以用它代替time.sleep
。
# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(waiting_read), [], [])
这将为我们返回所有可读文件 - 如果有,我们将运行相应的协程。如果没有,则表示我们已经等待当前协程运行足够长的时间。
# new - reschedule waiting coroutine, run readable coroutine
if readable:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read[readable[0]]
最后,我们必须实际监听读取请求。
# new
if isinstance(command, AsyncSleep):
...
elif isinstance(command, AsyncRead):
...
3.4. 整合
以上内容有些简化。如果我们总是可以读取,我们需要进行一些切换,以免让休眠的协程挨饿。我们需要处理没有东西可读取或没有东西可等待的情况。然而,最终结果仍然符合 30 LOC 的要求。
def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
waiting_read = {} # type: Dict[file, coroutine]
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting or waiting_read:
# 2. wait until the next coroutine may run or read ...
try:
until, coroutine = waiting.pop(0)
except IndexError:
until, coroutine = float('inf'), None
readable, _, _ = select.select(list(waiting_read), [], [])
else:
readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
# ... and select the appropriate one
if readable and time.time() < until:
if until and coroutine:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read.pop(readable[0])
# 3. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension ...
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
# ... or register reads
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
3.5. 协作 I/O
AsyncSleep
和AsyncRead
的实现run
现在已完全可以用于休眠和/或读取。与 一样sleepy
,我们可以定义一个辅助程序来测试读取:
async def ready(path, amount=1024*32):
print('read', path, 'at', '%d' % time.time())
with open(path, 'rb') as file:
result = await AsyncRead(file, amount)
print('done', path, 'at', '%d' % time.time())
print('got', len(result), 'B')
run(sleepy('background', 5), ready('/dev/urandom'))
运行此程序,我们可以看到我们的 I/O 与等待任务交叉:
id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B
4.非阻塞I/O
虽然文件 I/O 传达了这一概念,但它并不真正适合这样的库asyncio
:文件的select
调用总是返回,并且open
和都read
可能无限期阻塞。这会阻止事件循环的所有协程 - 这是不好的。像这样的库aiofiles
使用线程和同步来伪造文件上的非阻塞 I/O 和事件。
但是,套接字确实允许非阻塞 I/O - 并且其固有的延迟使其变得更加关键。当在事件循环中使用时,等待数据和重试可以被包装而不会阻塞任何东西。
4.1. 非阻塞 I/O 事件
与我们的 类似AsyncRead
,我们可以为套接字定义一个暂停和读取事件。我们采用套接字(必须是非阻塞的)而不是文件。此外,我们__await__
使用socket.recv
而不是file.read
。
class AsyncRecv:
def __init__(self, connection, amount=1, read_buffer=1024):
assert not connection.getblocking(), 'connection must be non-blocking for async recv'
self.connection = connection
self.amount = amount
self.read_buffer = read_buffer
self._buffer = b''
def __await__(self):
while len(self._buffer) < self.amount:
try:
self._buffer += self.connection.recv(self.read_buffer)
except BlockingIOError:
yield self
return self._buffer
def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.connection, self.amount, len(self._buffer)
)
与 相比AsyncRead
,__await__
执行真正的非阻塞 I/O。当数据可用时,它始终读取。当没有数据可用时,它始终暂停。这意味着事件循环仅在我们执行有用的工作时被阻塞。
4.2. 解除事件循环阻塞
就事件循环而言,没有什么太大的变化。要监听的事件仍然与文件相同 - 标记为就绪的文件描述符select
。
# old
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
waiting_read[command.connection] = coroutine
此时,应该很明显AsyncRead
和AsyncRecv
是同一种事件。我们可以轻松地将它们重构为具有可交换 I/O 组件的一个事件。实际上,事件循环、协程和事件将调度程序、任意中间代码和实际 I/O清晰地分开。
4.3. 非阻塞 I/O 的丑陋一面
原则上,此时你应该做的是复制 for 的逻辑read
。recv
但是AsyncRecv
,现在这更丑陋了——当函数在内核中阻塞时,你必须处理早期返回,但将控制权交给你。例如,打开连接比打开文件要长得多:
# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
connection.connect((url, port))
except BlockingIOError:
pass
长话短说,剩下的就是几十行异常处理。此时事件和事件循环已经可以正常工作了。
id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5
附录
github 上的示例代码
解决方案 3:
您的coro
脱糖在概念上是正确的,但略有不完整。
await
不会无条件暂停,但只有在遇到阻塞调用时才会暂停。它如何知道调用正在阻塞?这取决于正在等待的代码。例如,套接字读取的可等待实现可以简化为:
def read(sock, n):
# sock must be in non-blocking mode
try:
return sock.recv(n)
except EWOULDBLOCK:
event_loop.add_reader(sock.fileno, current_task())
return SUSPEND
在真正的 asyncio 中,等效代码会修改 a 的状态Future
,而不是返回魔法值,但概念是相同的。当适当地适应类似生成器的对象时,上述代码可以进行await
编辑。
在调用方,当你的协程包含:
data = await read(sock, 1024)
它将糖脱掉,变成类似这样的形式:
data = read(sock, 1024)
if data is SUSPEND:
return SUSPEND
self.pos += 1
self.parts[self.pos](...)
熟悉发电机的人往往会用yield from
自动暂停的方式来描述上述情况。
暂停链一直延续到事件循环,事件循环会注意到协程已暂停,将其从可运行集合中移除,然后继续执行可运行的协程(如果有)。如果没有可运行的协程,则循环将一直等待,select()
直到协程感兴趣的文件描述符准备好进行 IO 或超时到期。(事件循环维护文件描述符到协程的映射。)
在上面的例子中,一旦select()
告诉事件循环sock
可读,它就会重新添加coro
到可运行集合中,因此它会从暂停点继续执行。
换句话说:
默认情况下所有事情都发生在同一个线程中。
事件循环负责调度协程,并在它们等待的内容(通常是阻塞的 IO 调用或超时)准备就绪时唤醒它们。
为了深入了解协程驱动事件循环,我推荐Dave Beazley 的这次演讲,他在现场观众面前演示了如何从头开始编写事件循环代码。
解决方案 4:
这一切可以归结为 asyncio 正在解决的两个主要挑战:
如何在单个线程中执行多个 I/O?
如何实现协作多任务?
第一点的答案已经存在很长一段时间了,被称为select 循环。在 python 中,它是在selectors 模块中实现的。
第二个问题与协程的概念有关,即可以停止执行并在稍后恢复的函数。在 Python 中,协程是使用生成器和Yiil from语句实现的。这就是async/await 语法背后的原理。
更多资源请参阅此答案。
编辑:解决你关于 goroutines 的评论:
在 asyncio 中,与 goroutine 最接近的其实不是协程,而是任务(请参阅文档中的区别)。在 Python 中,协程(或生成器)对事件循环或 I/O 的概念一无所知。它只是一个函数,可以使用来停止执行,yield
同时保持其当前状态,以便稍后恢复。yield from
语法允许以透明的方式链接它们。
现在,在 asyncio 任务中,链最底层的协程总是会产生一个Future。然后,这个 Future 会冒泡到事件循环,并集成到内部机制中。当 Future 被其他内部回调设置为完成时,事件循环可以通过将 Future 发送回协程链来恢复任务。
编辑:解决您帖子中的一些问题:
在这种情况下,I/O 实际上是怎样发生的?在单独的线程中?整个解释器是否被暂停,并且 I/O 是否在解释器之外发生?
不会,线程中什么都不会发生。I/O 始终由事件循环管理,主要是通过文件描述符。但是这些文件描述符的注册通常被高级协程隐藏,从而让您做些脏活。
I/O 到底是什么意思?如果我的 Python 程序调用 C open() 程序,而它又向内核发送中断,将控制权交给内核,那么 Python 解释器如何知道这一点并能够继续运行其他代码,而内核代码执行实际的 I/O,直到它唤醒最初发送中断的 Python 程序?原则上,Python 解释器如何知道这种情况的发生?
I/O 是任何阻塞调用。在 asyncio 中,所有 I/O 操作都应通过事件循环,因为正如您所说,事件循环无法知道某些同步代码中正在执行阻塞调用。这意味着您不应该open
在协程上下文中使用同步。相反,请使用专用库,例如提供异步版本的aiofilesopen
。
解决方案 5:
如果你想象一个机场控制塔,许多飞机等待降落在同一条跑道上。控制塔可以看作是事件循环,跑道是线程。每架飞机都是一个等待执行的单独函数。实际上,一次只能有一架飞机降落在跑道上。asyncio 基本上是通过使用事件循环暂停函数并允许其他函数运行,允许多架飞机同时降落在同一条跑道上。当你使用 await 语法时,它基本上意味着可以暂停飞机(函数并允许其他函数处理
解决方案 6:
它允许您编写单线程异步代码并在 Python 中实现并发。基本上,asyncio
为异步编程提供事件循环。例如,如果我们需要在不阻塞主线程的情况下发出请求,我们可以使用该asyncio
库。
asyncio 模块允许使用以下元素的组合来实现异步编程:
事件循环:asyncio 模块允许每个进程有一个事件循环。
协程:协程是一种遵循某些约定的生成器。其最有趣的特性是,它可以在执行期间暂停以等待外部处理(I/O 中的某个例程),并在外部处理完成后从停止点返回。
Futures:Futures 表示尚未完成的进程。Future 是一个应该在未来产生结果的对象,表示未完成的任务。
Tasks:这是
asyncio
.Future 的一个子类,用于封装和管理协程。我们可以使用 asyncio.Task 对象来封装协程。
其中最重要的概念asyncio
是事件循环。事件循环允许您使用回调或协程编写异步代码。理解的关键asyncio
是协程和事件循环的术语。协程是有状态的函数,在执行另一个 I/O 操作时可以停止执行。事件循环用于协调协程的执行。
要运行任何协程函数,我们需要获取事件循环。我们可以使用以下代码来实现
loop = asyncio.get_event_loop()
这为我们提供了一个BaseEventLoop
对象。它有一个run_until_complete
方法,该方法接受一个协程并运行它直到完成。然后,协程返回一个结果。在低级别上,事件循环执行该BaseEventLoop.rununtilcomplete(future)
方法。
- 2024年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 项目管理必备:盘点2024年13款好用的项目管理软件
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)