FastAPI python:如何在后台运行线程?
- 2024-12-11 08:47:00
- admin 原创
- 229
问题描述:
我正在使用 FastAPI 在 python 中创建一个服务器,我想要一个与我的 API 无关的函数,每 5 分钟在后台运行一次(比如从 API 检查内容并根据响应打印内容)
我试图创建一个运行该函数的线程start_worker
,但它没有打印任何内容。
有人知道怎么做吗?
def start_worker():
print('[main]: starting worker...')
my_worker = worker.Worker()
my_worker.working_loop() # this function prints "hello" every 5 seconds
if __name__ == '__main__':
print('[main]: starting...')
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
_worker_thread = Thread(target=start_worker, daemon=False)
_worker_thread.start()
解决方案 1:
选项 1
您应该Thread
在调用之前启动您的uvicorn.run
,因为它uvicorn.run
会阻塞线程。
from fastapi import FastAPI
import threading
import uvicorn
import time
app = FastAPI()
class BackgroundTasks(threading.Thread):
def run(self,*args,**kwargs):
while True:
print('Hello')
time.sleep(5)
if __name__ == '__main__':
t = BackgroundTasks()
t.start()
uvicorn.run(app, host="0.0.0.0", port=8000)
您也可以Thread
使用 FastAPI 的启动事件来启动,只要它可以在应用程序启动之前运行即可(更新:请参阅下面的选项 3 了解如何使用lifespan
事件,因为startup
事件现在已被弃用)。
@app.on_event("startup")
async def startup_event():
t = BackgroundTasks()
t.start()
选项 2
除了while True:
循环之外,您还可以使用重复事件调度程序来执行后台任务,如下所示:
from fastapi import FastAPI
from threading import Thread
import uvicorn
import sched, time
app = FastAPI()
s = sched.scheduler(time.time, time.sleep)
def print_event(sc):
print("Hello")
sc.enter(5, 1, print_event, (sc,))
def start_scheduler():
s.enter(5, 1, print_event, (s,))
s.run()
@app.on_event("startup")
async def startup_event():
thread = Thread(target=start_scheduler)
thread.start()
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8000)
选项 3
如果您的任务是一个async def
函数(有关 FastAPI 中的vs端点/后台任务的更多详细信息,请参阅此答案),那么您可以使用该函数将任务添加到当前事件循环。该函数接受一个协程对象(即一个函数)并返回一个对象(如果需要,可以将其用于任务,或者它等)。该调用在当前线程的事件循环内创建任务,并与事件循环中的所有其他任务同时在“后台”执行它,并在它们之间切换。def
`async defasyncio.create_task()
create_task()async def
Taskawait
cancel`await
在调用 之前,需要创建一个事件循环,并且该循环在以编程方式(例如,使用)或在终端中(例如,使用)create_task()
启动服务器时已经创建。除了使用,还可以使用获取当前事件,然后调用。uvicorn
`uvicorn.run(app)uvicorn app:app
asyncio.create_task()asyncio.get_running_loop()
loop`loop.create_task()
下面的示例使用最近记录的添加事件的方法lifespan
(使用上下文管理器),即在应用程序启动之前以及应用程序关闭时应执行的代码(请参阅文档,以及此答案和此答案以获取更多详细信息和示例)。 也可以继续使用startup
和shutdown
事件,如前面的选项所示;但是,这些事件处理程序可能会从未来的 FastAPI/Starlette 版本中删除。
from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio
async def print_task(s):
while True:
print('Hello')
await asyncio.sleep(s)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Run at startup
asyncio.create_task(print_task(5))
yield
# Run on shutdown (if required)
print('Shutting down...')
app = FastAPI(lifespan=lifespan)
其他解决方案
替代解决方案可能包括使用ApScheduler
,更具体地说AsyncIOScheduler
,如本答案所示。