FastAPI StreamingResponse 无法使用生成器函数进行流式传输
- 2024-12-31 08:37:00
- admin 原创
- 136
问题描述:
我有一个相对简单的 FastAPI 应用,它接受查询并从 ChatGPT 的 API 流回响应。ChatGPT 正在流回结果,我可以看到它在进入时被打印到控制台。
不起作用的是StreamingResponse
通过 FastAPI 返回。响应被一起发送。我真的不知道为什么这不起作用。
以下是 FastAPI 应用程序代码:
import os
import time
import openai
import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse
auth_scheme = HTTPBearer()
app = fastapi.FastAPI()
openai.api_key = os.environ["OPENAI_API_KEY"]
def ask_statesman(query: str):
#prompt = router(query)
completion_reason = None
response = ""
while not completion_reason or completion_reason == "length":
openai_stream = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": query}],
temperature=0.0,
stream=True,
)
for line in openai_stream:
completion_reason = line["choices"][0]["finish_reason"]
if "content" in line["choices"][0].delta:
current_response = line["choices"][0].delta.content
print(current_response)
yield current_response
time.sleep(0.25)
@app.post("/")
async def request_handler(auth_key: str, query: str):
if auth_key != "123":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": auth_scheme.scheme_name},
)
else:
stream_response = ask_statesman(query)
return StreamingResponse(stream_response, media_type="text/plain")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")
下面是一个test.py
用来测试这个的非常简单的文件:
import requests
query = "How tall is the Eiffel tower?"
url = "http://localhost:8000"
params = {"auth_key": "123", "query": query}
response = requests.post(url, params=params, stream=True)
for chunk in response.iter_lines():
if chunk:
print(chunk.decode("utf-8"))
解决方案 1:
POST
首先,使用request 向服务器请求数据不是一个好习惯。GET
对于你的情况,使用 request 会更合适。除此之外,你不应该发送凭证,例如auth_key
作为 URL 的一部分(即使用查询字符串),而应该使用Headers
and/or Cookies
(使用HTTPS
)。请查看此答案以获取有关标头和 cookie 概念的更多详细信息和示例,以及使用查询参数时所涉及的风险。有关此主题的有用帖子也可以在这里和这里找到,以及这里、这里和这里。
其次,如果你正在 的生成器函数内执行阻塞操作(即阻塞 I/O 密集型或 CPU 密集型任务)StreamingResponse
,则应使用def
而不是来定义生成器函数async def
,因为否则,阻塞操作以及time.sleep()
生成器内部使用的函数都会阻塞事件循环。 如此处所述,如果用于流式传输响应主体的函数是普通def
生成器而不是生成async def
器,则 FastAPI 将使用iterate_in_threadpool()
在单独的线程中运行迭代器/生成器,然后将其await
编辑 - 请参阅StreamingResponse
的相关源代码。 如果你更喜欢使用async def
生成器,请确保在外部ThreadPool
(或ProcessPool
)中执行任何阻塞操作await
,并使用 而await asyncio.sleep()
不是time.sleep()
,以防你需要在操作执行中添加延迟。 请查看这个详细的答案以了解更多详细信息和示例。
第三,您使用的是requests
'函数,该函数一次一行地iter_lines()
迭代响应数据。但是,如果响应数据不包含任何换行符(即),则在客户端控制台上看不到数据在到达时被打印,直到客户端收到整个响应并将其作为一个整体打印出来。在这种情况下,您应该根据需要使用并指定(下面的示例中演示了这两种情况)。`iter_content()
chunk_size`
最后,如果您希望StreamingResponse
在每个 Web 浏览器(包括 Chrome)中都能正常工作(即能够看到流入的数据),则应将 指定media_type
为不同于 的类型text/plain
(例如application/json
或text/event-stream
,请参阅此处),或禁用MIME 嗅探。如此处所述,浏览器将开始缓冲text/plain
一定量的响应(大约 1445 字节,如此处所述),以检查收到的内容是否实际上是纯文本。为避免这种情况,您可以将 设置media_type
为text/event-stream
(用于服务器发送事件),或继续使用text/plain
,但将X-Content-Type-Options
响应标头设置为nosniff
,这将禁用 MIME 嗅探(下面的示例中演示了这两个选项)。
工作示例
应用程序
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def fake_data_streamer():
for i in range(10):
yield b'some fake data
'
await asyncio.sleep(0.5)
# If your generator contains blocking operations such as time.sleep(), then define the
# generator function with normal `def`. Alternatively, use `async def` and run any
# blocking operations in an external ThreadPool/ProcessPool. (see 2nd paragraph of this answer)
'''
import time
def fake_data_streamer():
for i in range(10):
yield b'some fake data
'
time.sleep(0.5)
'''
@app.get('/')
async def main():
return StreamingResponse(fake_data_streamer(), media_type='text/event-stream')
# or, use:
'''
headers = {'X-Content-Type-Options': 'nosniff'}
return StreamingResponse(fake_data_streamer(), headers=headers, media_type='text/plain')
'''
test.py(使用 Python requests
)
import requests
url = "http://localhost:8000/"
with requests.get(url, stream=True) as r:
for chunk in r.iter_content(1024): # or, for line in r.iter_lines():
print(chunk)
test.py (使用httpx
— 请参阅这个,以及这个和这个httpx
了解使用的好处requests
)
import httpx
url = 'http://127.0.0.1:8000/'
with httpx.stream('GET', url) as r:
for chunk in r.iter_raw(): # or, for line in r.iter_lines():
print(chunk)
解决方案 2:
如果您选择使用 Langchain 与 OpenAI 交互(我强烈推荐),它提供了流方法,可以有效地返回一个生成器。
对上面的Chris代码进行了轻微的修改,
api.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.llms import OpenAI
llm = OpenAI(
streaming=True,
verbose=True,
temperature=0,
)
app = FastAPI()
def chat_gpt_streamer(query: str):
for resp in llm.stream(query):
yield resp["choices"][0]["text"]
@app.get('/streaming/ask')
async def main(query: str):
return StreamingResponse(chat_gpt_streamer(query), media_type='text/event-stream')
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")
类似地,您可以使用 httpx 或请求进行测试(再次从 Chris 的代码复制粘贴):
test.py
import httpx
url = 'http://127.0.0.1:8000/streaming/ask?query=How are you, write in 10 sentences'
with httpx.stream('GET', url) as r:
for chunk in r.iter_raw(): # or, for line in r.iter_lines():
print(chunk)
解决方案 3:
可能会考虑查看服务器发送事件: https: //github.com/sysid/sse-starlette
首先安装库:pip install sse-starlette
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import time
app = FastAPI()
def data_streamer():
for i in range(10):
yield f"_{i}_".encode("utf-8")
time.sleep(1)
@app.get('/')
async def main():
return EventSourceResponse(data_streamer(), media_type='text/event-stream')
解决方案 4:
如果测试,curl
请确保使用该-N
标志,这样它就不会缓冲你的响应
在 docker 容器中测试ollama我发现以下命令会保留响应流,直到出现换行符或响应完成,然后再打印到 stdout
curl -X POST "http://localhost:12345/api/generate" -H "Content-Type: application/json" -d '{"model": "wizard-vicuna-uncensored", "prompt": "why is the sky blue? be verbose"}'
此命令提供所需的打字机 UX
curl -N -X POST "http://localhost:12345/api/generate" -H "Content-Type: application/json" -d '{"model": "wizard-vicuna-uncensored", "prompt": "why is the sky blue? be verbose"}'
更多技术细节可参阅本文
解决方案 5:
在
ask_statesman
函数中,将yield current_response
语句更改为yield{"data": current_response}
。这会将每个响应行包装到带有键的字典中"data"
。在
request_handler
函数中,不是stream_response
直接返回,而是返回一个生成器表达式,该表达式生成包装在字典中的每一行响应,ask_statesman
如上所示。以下是修改后的代码:
import os
import time
import openai
import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse
auth_scheme = HTTPBearer()
app = fastapi.FastAPI()
openai.api_key = os.environ["OPENAI_API_KEY"]
def ask_statesman(query: str):
#prompt = router(query)
completion_reason = None
response = ""
while not completion_reason or completion_reason == "length":
openai_stream = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": query}],
temperature=0.0,
stream=True,
)
for line in openai_stream:
completion_reason = line["choices"][0]["finish_reason"]
if "content" in line["choices"][0].delta:
current_response = line["choices"][0].delta.content
print(current_response)
yield {"data": current_response}
time.sleep(0.25)
@app.post("/")
async def request_handler(auth_key: str, query: str):
if auth_key != "123":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": auth_scheme.scheme_name},
)
else:
return StreamingResponse((line for line in ask_statesman(query)), media_type="text/plain")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")