如何使用 FastAPI 上传文件?
- 2024-12-02 08:41:00
- admin 原创
- 256
问题描述:
我按照官方文档使用 FastAPI 上传文件,如下所示:
@app.post("/create_file")
async def create_file(file: UploadFile = File(...)):
file2store = await file.read()
# some code to store the BytesIO(file2store) to the other database
当我使用 Python 请求库发送请求时,如下所示:
f = open(".../file.txt", 'rb')
files = {"file": (f.name, f, "multipart/form-data")}
requests.post(url="SERVER_URL/create_file", files=files)
该file2store
变量始终为空。有时(很少见),它可以获取文件字节,但几乎所有时间它都是空的,所以我无法在其他数据库上恢复该文件。
我也尝试了bytes
而不是UploadFile
,但得到的结果相同。我的代码有问题,还是我使用 FastAPI 上传文件的方式错误?
解决方案 1:
首先,根据FastAPI 文档,你需要安装python-multipart
— 如果尚未安装 — 因为上传的文件将作为“表单数据”发送。例如:
pip install python-multipart
下面的示例使用对象.file
的属性UploadFile
来获取实际的 Python 文件(即SpooledTemporaryFile
),这允许您调用SpooledTemporaryFile
的方法,例如和.read()
,.close()
而不必await
它们。 但是,在这种情况下,用定义端点非常重要def
- 否则,如果端点是用定义的async def
,则此类操作将阻止服务器,直到它们完成。 在 FastAPI 中,正常def
端点在外部线程池中运行,然后await
被编辑,而不是直接调用(因为它会阻止服务器)。 我强烈建议您看一下这个答案def
,它解释了和端点之间的区别async def
,并提供了在需要在端点内运行阻塞操作时的许多解决方案async def
。
SpooledTemporaryFile
FastAPI/Starlette 使用的属性max_size
设置为 1 MB,这意味着数据在内存中缓冲,直到文件大小超过 1 MB,此时数据将写入磁盘上的临时文件(位于操作系统的临时目录下)。因此,如果您上传的文件大于 1 MB,则不会存储在内存中,调用file.file.read()
实际上会将数据从磁盘读入内存。因此,如果文件太大而无法放入服务器的 RAM 中,您应该分块读取文件并一次处理一个块,如下面“分块读取文件”部分中所述。
如上所述以及在此答案def
中也解释过,FastAPI / Starlette 使用 AnyIO 线程在外部线程池中运行阻塞函数(例如用 normal 定义的端点) ,然后运行await
它们(以便 FastAPI 仍然可以异步工作),以防止它们阻塞事件循环(主线程的事件循环),从而阻塞整个服务器。因此,每次 HTTP 请求到达用 normal 定义的端点时def
,都会生成一个新线程(或者如果可用,将使用一个空闲线程),因此,根据项目的要求、预期的流量(即同时访问您的 API 的用户数量),以及 API 中最终将在该线程池中运行的任何其他阻塞函数(有关更多详细信息,请参阅上面链接的答案),您可能需要调整该线程池中的最大线程数(有关如何执行此操作,请参阅上面链接的答案)。
但是,您应该始终尽可能使用异步代码(即使用async
/ await
),因为async
代码直接在单个线程(在本例中为主线程)中运行的事件循环中运行。一种选择是定义端点async def
并使用FastAPI 提供的异步 read()
/ write()
/ /etc. 文件方法,如此答案中所示。但是,您应该注意,正如此答案中所解释的那样,FastAPI 在后台实际上是在与前面描述的外部线程池不同的线程中调用相应的同步Python File 方法。因此,它可能会产生也可能不会产生很大的不同(在选择一种方法而不是另一种方法之前,请务必执行并比较测试)。close()
请注意,在本答案的底部以及本答案中,解释并演示了另一种方法,即如何使用 Starlette分块上传大文件request.stream()
,这可以大大减少上传文件所需的时间,并避免使用该线程池中的线程。因此,我强烈建议您看一下。
上传单个文件
应用程序
from fastapi import File, UploadFile, HTTPException
@app.post("/upload")
def upload(file: UploadFile = File(...)):
try:
contents = file.file.read()
with open(file.filename, 'wb') as f:
f.write(contents)
except Exception:
raise HTTPException(status_code=500, detail='Something went wrong')
finally:
file.file.close()
return {"message": f"Successfully uploaded {file.filename}"}
分块读取文件
如前所述,以及在此答案中,如果文件太大而无法放入内存中(例如,如果您有 8GB 的 RAM,则无法加载 50GB 的文件(更不用说可用的 RAM 总是小于您机器上安装的总数量,因为其他应用程序将使用部分 RAM)——您应该将文件分块加载到内存中,然后一次处理一个块的数据。但是,此方法可能需要更长时间才能完成,具体取决于您选择的块大小——在下面的示例中,块大小为1024 * 1024
字节(即 1MB)。您可以根据需要调整块大小。
from fastapi import File, UploadFile, HTTPException
@app.post("/upload")
def upload(file: UploadFile = File(...)):
try:
with open(file.filename, 'wb') as f:
while contents := file.file.read(1024 * 1024):
f.write(contents)
except Exception:
raise HTTPException(status_code=500, detail='Something went wrong')
finally:
file.file.close()
return {"message": f"Successfully uploaded {file.filename}"}
另一个选项是使用shutil.copyfileobj()
,它用于将一个file-like
对象的内容复制到另一个file-like
对象(也请查看此答案)。默认情况下,数据以块的形式读取,默认缓冲区(块)大小为 1MB(即1024 * 1024
字节)(对于 Windows)和其他平台的 64KB,如源代码所示。您可以通过传递可选参数来指定缓冲区大小length
。注意:如果length
传递了负值,则将读取整个文件的内容——f.read()
另请参阅,其.copyfileobj()
在后台使用(如源代码所示)。
from fastapi import File, UploadFile, HTTPException
import shutil
@app.post("/upload")
def upload(file: UploadFile = File(...)):
try:
with open(file.filename, 'wb') as f:
shutil.copyfileobj(file.file, f)
except Exception:
raise HTTPException(status_code=500, detail='Something went wrong')
finally:
file.file.close()
return {"message": f"Successfully uploaded {file.filename}"}
test.py (使用requests
)
import requests
url = 'http://127.0.0.1:8000/upload'
file = {'file': open('images/1.png', 'rb')}
resp = requests.post(url=url, files=file)
print(resp.json())
test.py (使用httpx
)
import httpx
url = 'http://127.0.0.1:8000/upload'
file = {'file': open('images/1.png', 'rb')}
resp = httpx.post(url=url, files=file)
print(resp.json())
有关 HTML<form>
示例,请参见此处。
上传多个文件(列表)
应用程序
from fastapi import File, UploadFile, HTTPException
from typing import List
@app.post("/upload")
def upload(files: List[UploadFile] = File(...)):
for file in files:
try:
contents = file.file.read()
with open(file.filename, 'wb') as f:
f.write(contents)
except Exception:
raise HTTPException(status_code=500, detail='Something went wrong')
finally:
file.file.close()
return {"message": f"Successfuly uploaded {[file.filename for file in files]}"}
分块读取文件
正如本答案前面所述,如果您预计某些文件相当大,但没有足够的 RAM 来容纳从头到尾的所有数据,那么您应该将文件分块加载到内存中,这样每次处理一个块的数据(注意:根据需要调整块大小,下面是1024 * 1024
字节)。
from fastapi import File, UploadFile, HTTPException
from typing import List
@app.post("/upload")
def upload(files: List[UploadFile] = File(...)):
for file in files:
try:
with open(file.filename, 'wb') as f:
while contents := file.file.read(1024 * 1024):
f.write(contents)
except Exception:
raise HTTPException(status_code=500, detail='Something went wrong')
finally:
file.file.close()
return {"message": f"Successfuly uploaded {[file.filename for file in files]}"}
或者使用shutil.copyfileobj()
:
from fastapi import File, UploadFile, HTTPException
from typing import List
import shutil
@app.post("/upload")
def upload(files: List[UploadFile] = File(...)):
for file in files:
try:
with open(file.filename, 'wb') as f:
shutil.copyfileobj(file.file, f)
except Exception:
raise HTTPException(status_code=500, detail='Something went wrong')
finally:
file.file.close()
return {"message": f"Successfuly uploaded {[file.filename for file in files]}"}
test.py (使用requests
)
import requests
url = 'http://127.0.0.1:8000/upload'
files = [('files', open('images/1.png', 'rb')), ('files', open('images/2.png', 'rb'))]
resp = requests.post(url=url, files=files)
print(resp.json())
test.py (使用httpx
)
import httpx
url = 'http://127.0.0.1:8000/upload'
files = [('files', open('images/1.png', 'rb')), ('files', open('images/2.png', 'rb'))]
resp = httpx.post(url=url, files=files)
print(resp.json())
有关 HTML<form>
示例,请参见此处。
解决方案 2:
@app.post("/create_file/")
async def image(image: UploadFile = File(...)):
print(image.file)
# print('../'+os.path.isdir(os.getcwd()+"images"),"*************")
try:
os.mkdir("images")
print(os.getcwd())
except Exception as e:
print(e)
file_name = os.getcwd()+"/images/"+image.filename.replace(" ", "-")
with open(file_name,'wb+') as f:
f.write(image.file.read())
f.close()
file = jsonable_encoder({"imagePath":file_name})
new_image = await add_image(file)
return {"filename": new_image}