“发射后不管” python async/await

2024-12-30 08:42:00
admin
原创
49
摘要:问题描述:有时需要执行一些非关键的异步操作,但我不想等待它完成。在 Tornado 的协程实现中,您只需省略yield关键字即可“触发并忘记”异步函数。我一直在尝试弄清楚如何使用 Python 3.5 中发布的新async/await语法来实现“触发并忘记”。例如,简化的代码片段:async def asyn...

问题描述:

有时需要执行一些非关键的异步操作,但我不想等待它完成。在 Tornado 的协程实现中,您只需省略yield关键字即可“触发并忘记”异步函数。

我一直在尝试弄清楚如何使用 Python 3.5 中发布的新async/await语法来实现“触发并忘记”。例如,简化的代码片段:

async def async_foo():
    print("Do some stuff asynchronously here...")

def bar():
    async_foo()  # fire and forget "async_foo()"

bar()

但实际情况是,它bar()从未执行,而是我们收到运行时警告:

RuntimeWarning: coroutine 'async_foo' was never awaited
  async_foo()  # fire and forget "async_foo()"

解决方案 1:

更新:

如果您使用的是 Python >= 3.7,则请到处替换asyncio.ensure_future它。asyncio.create_task这是生成任务的一种更新、更好的方法。


asyncio.Task 改为“发射后不管”

根据 python 文档,asyncio.Task可以启动一些协程以“在后台”执行。 创建的任务asyncio.ensure_future不会阻止执行(因此函数将立即返回!)。 这看起来像是一种“触发并忘记”的方式,正如您所要求的那样。

import asyncio


async def async_foo():
    print("async_foo started")
    await asyncio.sleep(1)
    print("async_foo done")


async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()

    # btw, you can also create tasks inside non-async funcs

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

Do some actions 1
async_foo started
Do some actions 2
async_foo done
Do some actions 3

如果事件循环完成后任务正在执行会怎么样?

请注意,asyncio 期望任务在事件循环完成时完成。因此,如果您将其更改main()为:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')

程序运行完成后你会收到以下警告:

Task was destroyed but it is pending!
task: <Task pending coro=<async_foo() running at [...]

为了防止这种情况,您可以在事件循环完成后等待所有待处理的任务:

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(0.1)
    print('Do some actions 2')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    
    # Let's also finish all running tasks:
    pending = asyncio.Task.all_tasks()
    loop.run_until_complete(asyncio.gather(*pending))

终止任务而不是等待它们

有时你不想等待任务完成(例如,某些任务可能会被创建为永远运行)。在这种情况下,你可以直接执行cancel()它们而不是等待它们:

import asyncio
from contextlib import suppress


async def echo_forever():
    while True:
        print("echo")
        await asyncio.sleep(1)


async def main():
    asyncio.ensure_future(echo_forever())  # fire and forget

    print('Do some actions 1')
    await asyncio.sleep(1)
    print('Do some actions 2')
    await asyncio.sleep(1)
    print('Do some actions 3')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # Let's also cancel all running tasks:
    pending = asyncio.Task.all_tasks()
    for task in pending:
        task.cancel()
        # Now we should await task to execute it's cancellation.
        # Cancelled task raises asyncio.CancelledError that we can suppress:
        with suppress(asyncio.CancelledError):
            loop.run_until_complete(task)

输出:

Do some actions 1
echo
Do some actions 2
echo
Do some actions 3
echo

解决方案 2:

输出:

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed

这是一个简单的装饰函数,它将执行推送到后台,并将控制线移动到代码的下一行。

主要优点是,你不必将函数声明为await

import asyncio
import time

def fire_and_forget(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, *kwargs)

    return wrapped

@fire_and_forget
def foo():
    print("foo() started")
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")

注意:检查我的其他答案thread,它使用不带 的普通方法执行相同操作asyncio

解决方案 3:

这不是完全异步执行,但也许run_in_executor()适合您。

def fire_and_forget(task, *args, **kwargs):
    loop = asyncio.get_event_loop()
    if callable(task):
        return loop.run_in_executor(None, task, *args, **kwargs)
    else:    
        raise TypeError('Task must be a callable')

def foo():
    #asynchronous stuff here


fire_and_forget(foo)

解决方案 4:

如果您出于某种原因无法使用asyncio,那么这里是使用普通线程的实现。请查看我的其他答案以及 Sergey 的答案。

import threading, time

def fire_and_forget(f):
    def wrapped():
        threading.Thread(target=f).start()

    return wrapped

@fire_and_forget
def foo():
    print("foo() started")
    time.sleep(1)
    print("foo() completed")

print("Hello")
foo()
print("I didn't wait for foo()")

生产

>>> Hello
>>> foo() started
>>> I didn't wait for foo()
>>> foo() completed

解决方案 5:

存在未指定的终止问题,因为“发射后不管”没有说明活动必须何时完成(以及最终在程序终止时是否挂起或终止它们)。解决方案是使用上下文管理器。Python 3.11 现在将其作为TaskGroup。

较早的替代方案是aiowire包。它的上下文管理器具有超时选项,并使用“蹦床”设计,允许异步函数返回异步函数。这既避免了后台线程,也避免了无限的异步调用链。

解决方案 6:

def fire_and_forget(f):
    def wrapped(*args, **kwargs):
        threading.Thread(target=functools.partial(f, *args, **kwargs)).start()

    return wrapped

是上面的更好版本——不使用 asyncio

相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1019  
  IPD(Integrated Product Development,集成产品开发)是一种以客户需求为核心、跨职能团队协作为基础的产品开发方法。它通过整合市场、研发、制造、供应链等各个环节的资源与信息,实现高效的产品开发流程。IPD不仅是一种方法论,更是一种系统化的管理思维,旨在缩短产品开发周期、降低开发成本、提高产品...
IPD培训课程   0  
  华为的IPD(集成产品开发)流程是全球范围内备受认可的产品开发管理体系,其核心在于通过跨部门协作和系统化的流程管理,提升产品开发效率和质量。在IPD流程中,团队建设与领导力培养是两个至关重要的环节。高效的团队能够确保项目顺利推进,而优秀的领导力则是团队凝聚力和执行力的保障。本文将从团队建设的重要性、领导力在IPD中的核...
IPD集成产品开发流程   0  
  华为的集成产品开发(IPD)流程是其成功的关键因素之一,它不仅提升了产品开发的效率,还通过系统化的风险管理机制确保了项目的顺利推进。在IPD流程中,风险管理被视为贯穿始终的核心环节,其目的是在项目初期识别潜在问题,并在整个开发周期中持续监控和应对风险。通过有效的风险管理,华为能够最大限度地减少项目延误、成本超支和质量问...
IPD结构化流程   0  
  在项目管理领域,CDCP(Critical Decision Control Point)评审是确保项目成功的关键环节之一。CDCP评审的核心在于通过系统化的决策流程,确保项目在每个关键节点都能做出正确的选择,从而降低风险、提高效率并最终实现项目目标。然而,许多项目团队在CDCP评审过程中常常面临决策效率低下、信息不对...
华为IPD流程   0  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用