使用 Celery 取消已经执行的任务?
- 2025-03-13 09:14:00
- admin 原创
- 12
问题描述:
我一直在阅读文档并进行搜索,但似乎找不到直接的答案:
您能取消已经执行的任务吗?(例如,任务已经开始,需要一段时间,并且进行到一半时需要取消)
我在Celery FAQ 的文档中找到了这一点
>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()
但我不清楚这是否会取消排队的任务,或者是否会终止工作器上正在运行的进程。感谢您提供的任何信息!
解决方案 1:
撤销取消任务执行。如果任务被撤销,工作进程将忽略该任务并且不会执行它。如果您不使用持久撤销,您的任务可以在工作进程重启后执行。
https://docs.celeryq.dev/en/stable/userguide/workers.html#worker-persistent-revokes
revoke 有一个终止选项,默认情况下为False。如果需要终止正在执行的任务,则需要将终止设置为True。
>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)
https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks
解决方案 2:
在 Celery 3.1 中,撤销任务的 API发生了改变。
根据Celery FAQ,您应该使用 result.revoke:
>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()
或者如果你只有任务 ID:
>>> from proj.celery import app
>>> app.control.revoke(task_id)
解决方案 3:
@0x00mh 的答案是正确的,但是最近的 celery文档说使用该terminate
选项是“管理员的最后手段”,因为您可能会意外终止与此同时开始执行的另一个任务。可能更好的解决方案是结合使用terminate=True
(signal='SIGUSR1'
这会导致任务中引发 SoftTimeLimitExceeded 异常)。
解决方案 4:
你可以使用代理和后端定义 celery 应用程序,如下所示:
from celery import Celery
celeryapp = Celery('app', broker=redis_uri, backend=redis_uri)
当您运行发送任务时它会返回任务的唯一 ID:
task_id = celeryapp.send_task('run.send_email', queue = "demo")
要撤销任务,您需要 celery 应用程序和任务 ID:
celeryapp.control.revoke(task_id, terminate=True)
解决方案 5:
根据 5.2.3 文档,可以运行以下命令:
celery.control.revoke(task_id, terminate=True, signal='SIGKILL')
在哪里celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
文档链接:https://docs.celeryq.dev/en/stable/reference/celery.app.control.html? highlight=revoke#celery.app.control.Control.revoke
解决方案 6:
此外,不尽人意的是,还有另外一种方法(abort task)来停止任务,但是有很多不可靠性,更多细节参见:
http://docs.celeryproject.org/en/latest/reference/celery.contrib.abortable.html
解决方案 7:
from celery.app import default_app
revoked = default_app.control.revoke(task_id, terminated=True, signal='SIGKILL')
print(revoked)
解决方案 8:
请参阅以下任务选项:time_limit、soft_time_limit(或者您可以为工作者设置它)。如果您不仅想控制执行时间,请参阅apply_async 方法的expires参数。
解决方案 9:
from celery.result import AsyncResult
task = AsyncResult(task_id)
task.revoke()