FastAPI StreamingResponse 无法使用生成器函数进行流式传输

2024-12-31 08:37:00
admin
原创
136
摘要:问题描述:我有一个相对简单的 FastAPI 应用,它接受查询并从 ChatGPT 的 API 流回响应。ChatGPT 正在流回结果,我可以看到它在进入时被打印到控制台。不起作用的是StreamingResponse通过 FastAPI 返回。响应被一起发送。我真的不知道为什么这不起作用。以下是 FastA...

问题描述:

我有一个相对简单的 FastAPI 应用,它接受查询并从 ChatGPT 的 API 流回响应。ChatGPT 正在流回结果,我可以看到它在进入时被打印到控制台。

不起作用的是StreamingResponse通过 FastAPI 返回。响应被一起发送。我真的不知道为什么这不起作用。

以下是 FastAPI 应用程序代码:

import os
import time

import openai

import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse

auth_scheme = HTTPBearer()
app = fastapi.FastAPI()

openai.api_key = os.environ["OPENAI_API_KEY"]


def ask_statesman(query: str):
    #prompt = router(query)
    
    completion_reason = None
    response = ""
    while not completion_reason or completion_reason == "length":
        openai_stream = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": query}],
            temperature=0.0,
            stream=True,
        )
        for line in openai_stream:
            completion_reason = line["choices"][0]["finish_reason"]
            if "content" in line["choices"][0].delta:
                current_response = line["choices"][0].delta.content
                print(current_response)
                yield current_response
                time.sleep(0.25)


@app.post("/")
async def request_handler(auth_key: str, query: str):
    if auth_key != "123":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": auth_scheme.scheme_name},
        )
    else:
        stream_response = ask_statesman(query)
        return StreamingResponse(stream_response, media_type="text/plain")


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")

下面是一个test.py用来测试这个的非常简单的文件:

import requests

query = "How tall is the Eiffel tower?"
url = "http://localhost:8000"
params = {"auth_key": "123", "query": query}

response = requests.post(url, params=params, stream=True)

for chunk in response.iter_lines():
    if chunk:
        print(chunk.decode("utf-8"))

解决方案 1:

POST首先,使用request 向服务器请求数据不是一个好习惯。GET对于你的情况,使用 request 会更合适。除此之外,你不应该发送凭证,例如auth_key作为 URL 的一部分(即使用查询字符串),而应该使用Headersand/or Cookies(使用HTTPS)。请查看此答案以获取有关标头和 cookie 概念的更多详细信息和示例,以及使用查询参数时所涉及的风险。有关此主题的有用帖子也可以在这里和这里找到,以及这里、这里和这里。

其次,如果你正在 的生成器函数内执行阻塞操作(即阻塞 I/O 密集型或 CPU 密集型任务)StreamingResponse,则应使用def而不是来定义生成器函数async def,因为否则,阻塞操作以及time.sleep()生成器内部使用的函数都会阻塞事件循环。 如此处所述,如果用于流式传输响应主体的函数是普通def生成器而不是生成async def器,则 FastAPI 将使用iterate_in_threadpool()在单独的线程中运行迭代器/生成器,然后将其await编辑 - 请参阅StreamingResponse的相关源代码。 如果你更喜欢使用async def生成器,请确保在外部ThreadPool(或ProcessPool)中执行任何阻塞操作await,并使用 而await asyncio.sleep()不是time.sleep(),以防你需要在操作执行中添加延迟。 请查看这个详细的答案以了解更多详细信息和示例。

第三,您使用的是requests'函数,该函数一次一行地iter_lines()迭代响应数据。但是,如果响应数据不包含任何换行符(即),则在客户端控制台上看不到数据在到达时被打印,直到客户端收到整个响应并将其作为一个整体打印出来。在这种情况下,您应该根据需要使用并指定(下面的示例中演示了这两种情况)。`
iter_content()chunk_size`

最后,如果您希望StreamingResponse在每个 Web 浏览器(包括 Chrome)中都能正常工作(即能够看到流入的数据),则应将 指定media_type为不同于 的类型text/plain(例如application/jsontext/event-stream,请参阅此处),或禁用MIME 嗅探。如此处所述,浏览器将开始缓冲text/plain一定量的响应(大约 1445 字节,如此处所述),以检查收到的内容是否实际上是纯文本。为避免这种情况,您可以将 设置media_typetext/event-stream(用于服务器发送事件),或继续使用text/plain,但将X-Content-Type-Options响应标头设置为nosniff,这将禁用 MIME 嗅探(下面的示例中演示了这两个选项)。

工作示例

应用程序

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio


app = FastAPI()


async def fake_data_streamer():
    for i in range(10):
        yield b'some fake data

'
        await asyncio.sleep(0.5)


# If your generator contains blocking operations such as time.sleep(), then define the
# generator function with normal `def`. Alternatively, use `async def` and run any 
# blocking operations in an external ThreadPool/ProcessPool. (see 2nd paragraph of this answer)
'''
import time

def fake_data_streamer():
    for i in range(10):
        yield b'some fake data

'
        time.sleep(0.5)
'''        

    
@app.get('/')
async def main():
    return StreamingResponse(fake_data_streamer(), media_type='text/event-stream')
    # or, use:
    '''
    headers = {'X-Content-Type-Options': 'nosniff'}
    return StreamingResponse(fake_data_streamer(), headers=headers, media_type='text/plain')
    '''

test.py(使用 Python requests

import requests

url = "http://localhost:8000/"

with requests.get(url, stream=True) as r:
    for chunk in r.iter_content(1024):  # or, for line in r.iter_lines():
        print(chunk)

test.py (使用httpx— 请参阅这个,以及这个和这个httpx了解使用的好处requests)

import httpx

url = 'http://127.0.0.1:8000/'

with httpx.stream('GET', url) as r:
    for chunk in r.iter_raw():  # or, for line in r.iter_lines():
        print(chunk)

解决方案 2:

如果您选择使用 Langchain 与 OpenAI 交互(我强烈推荐),它提供了流方法,可以有效地返回一个生成器。

对上面的Chris代码进行了轻微的修改,

api.py

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.llms import OpenAI


llm = OpenAI(
    streaming=True,
    verbose=True,
    temperature=0,
)

app = FastAPI()


def chat_gpt_streamer(query: str):
    for resp in llm.stream(query):
        yield resp["choices"][0]["text"]


@app.get('/streaming/ask')
async def main(query: str):
    return StreamingResponse(chat_gpt_streamer(query), media_type='text/event-stream')

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")

类似地,您可以使用 httpx 或请求进行测试(再次从 Chris 的代码复制粘贴):

test.py

import httpx

url = 'http://127.0.0.1:8000/streaming/ask?query=How are you, write in 10 sentences'
with httpx.stream('GET', url) as r:
    for chunk in r.iter_raw():  # or, for line in r.iter_lines():
        print(chunk)

解决方案 3:

可能会考虑查看服务器发送事件: https: //github.com/sysid/sse-starlette

首先安装库:pip install sse-starlette

from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import time


app = FastAPI()


def data_streamer():
    for i in range(10):
        yield f"_{i}_".encode("utf-8")
        time.sleep(1)


@app.get('/')
async def main():
    return EventSourceResponse(data_streamer(), media_type='text/event-stream')

解决方案 4:

如果测试,curl请确保使用该-N标志,这样它就不会缓冲你的响应

在 docker 容器中测试ollama我发现以下命令会保留响应流,直到出现换行符或响应完成,然后再打印到 stdout

curl -X POST "http://localhost:12345/api/generate" -H "Content-Type: application/json" -d '{"model": "wizard-vicuna-uncensored", "prompt": "why is the sky blue? be verbose"}'

此命令提供所需的打字机 UX

curl -N -X POST "http://localhost:12345/api/generate" -H "Content-Type: application/json" -d '{"model": "wizard-vicuna-uncensored", "prompt": "why is the sky blue? be verbose"}'

更多技术细节可参阅本文

解决方案 5:

  • ask_statesman函数中,将yield current_response
    语句更改为yield {"data": current_response}。这会将每个响应行包装到带有键的字典中"data"

  • request_handler函数中,不是stream_response直接返回,而是返回一个生成器表达式,该表达式生成包装在字典中的每一行响应,ask_statesman如上所示。以下是修改后的代码:

import os
import time

import openai

import fastapi
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse

auth_scheme = HTTPBearer()
app = fastapi.FastAPI()

openai.api_key = os.environ["OPENAI_API_KEY"]


def ask_statesman(query: str):
    #prompt = router(query)
    
    completion_reason = None
    response = ""
    while not completion_reason or completion_reason == "length":
        openai_stream = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=[{"role": "user", "content": query}],
            temperature=0.0,
            stream=True,
        )
        for line in openai_stream:
            completion_reason = line["choices"][0]["finish_reason"]
            if "content" in line["choices"][0].delta:
                current_response = line["choices"][0].delta.content
                print(current_response)
                yield {"data": current_response}
                time.sleep(0.25)


@app.post("/")
async def request_handler(auth_key: str, query: str):
    if auth_key != "123":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": auth_scheme.scheme_name},
        )
    else:
        return StreamingResponse((line for line in ask_statesman(query)), media_type="text/plain")


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, debug=True, log_level="debug")
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用