使用 Python 请求进行异步请求
- 2024-12-13 08:36:00
- admin 原创
- 127
问题描述:
我尝试了python请求库文档中提供的示例。
使用async.map(rs)
,我得到了响应代码,但我想获取每个请求页面的内容。例如,这不起作用:
out = async.map(rs)
print out[0].content
解决方案 1:
笔记
以下答案不适用于请求 v0.13.0+。在写完这个问题后,异步功能被移至grequestsrequests
。但是,您只需用grequests
下面的替换它就可以了。
我保留这个答案是为了反映关于使用请求<v0.13.0的原始问题。
async.map
要异步执行多个任务,您必须:
定义一个函数来执行你想对每个对象执行的操作(你的任务)
在您的请求中添加该函数作为事件挂钩
调用
async.map
所有请求/操作的列表
例子:
from requests import async
# If using requests > v0.13.0, use
# from grequests import async
urls = [
'http://python-requests.org',
'http://httpbin.org',
'http://python-guide.org',
'http://kennethreitz.com'
]
# A simple task to do to each response object
def do_something(response):
print response.url
# A list to hold our things to do via async
async_list = []
for u in urls:
# The "hooks = {..." part is where you define what you want to do
#
# Note the lack of parentheses following do_something, this is
# because the response will be used as the first argument automatically
action_item = async.get(u, hooks = {'response' : do_something})
# Add the task to our list of things to do via async
async_list.append(action_item)
# Do our list of things to do via async
async.map(async_list)
解决方案 2:
async
现在 是 一个 独立 的 模块 : grequests
。
请参阅此处:https://github.com/spyoungtech/grequests
还有:通过 Python 发送多个 HTTP 请求的理想方法?
安装:
$ pip install grequests
用法:
构建堆栈:
import grequests
urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
rs = (grequests.get(u) for u in urls)
发送堆栈
grequests.map(rs)
结果看起来像
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
grequests 似乎没有对并发请求设置限制,即当多个请求发送到同一台服务器时。
解决方案 3:
不幸的是,据我所知,请求库不具备执行异步请求的功能。你可以使用async/await
语法包装requests
,但这不会使底层请求变得不那么同步。如果你想要真正的异步请求,你必须使用提供它的其他工具。一个这样的解决方案是aiohttp
(Python 3.5.3+)。根据我的经验,它与 Python 3.7 语法一起使用效果很好async/await
。下面我写了三种使用执行 n 个 Web 请求的实现
sync_requests_get_all
使用 Pythonrequests
库的纯同步请求( )使用Python 3.7 语法包装的
async_requests_get_all
Python库进行同步请求( )和requests
`async/await`asyncio
一个真正的异步实现(
async_aiohttp_get_all
),其中的 Pythonaiohttp
库以 Python 3.7async/await
语法包装,并且asyncio
"""
Tested in Python 3.5.10
"""
import time
import asyncio
import requests
import aiohttp
from asgiref import sync
def timed(func):
"""
records approximate durations of function calls
"""
def wrapper(*args, **kwargs):
start = time.time()
print('{name:<30} started'.format(name=func.__name__))
result = func(*args, **kwargs)
duration = "{name:<30} finished in {elapsed:.2f} seconds".format(
name=func.__name__, elapsed=time.time() - start
)
print(duration)
timed.durations.append(duration)
return result
return wrapper
timed.durations = []
@timed
def sync_requests_get_all(urls):
"""
performs synchronous get requests
"""
# use session to reduce network overhead
session = requests.Session()
return [session.get(url).json() for url in urls]
@timed
def async_requests_get_all(urls):
"""
asynchronous wrapper around synchronous requests
"""
session = requests.Session()
# wrap requests.get into an async function
def get(url):
return session.get(url).json()
async_get = sync.sync_to_async(get)
async def get_all(urls):
return await asyncio.gather(*[
async_get(url) for url in urls
])
# call get_all as a sync function to be used in a sync context
return sync.async_to_sync(get_all)(urls)
@timed
def async_aiohttp_get_all(urls):
"""
performs asynchronous get requests
"""
async def get_all(urls):
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with session.get(url) as response:
return await response.json()
return await asyncio.gather(*[
fetch(url) for url in urls
])
# call get_all as a sync function to be used in a sync context
return sync.async_to_sync(get_all)(urls)
if __name__ == '__main__':
# this endpoint takes ~3 seconds to respond,
# so a purely synchronous implementation should take
# little more than 30 seconds and a purely asynchronous
# implementation should take little more than 3 seconds.
urls = ['https://postman-echo.com/delay/3']*10
async_aiohttp_get_all(urls)
async_requests_get_all(urls)
sync_requests_get_all(urls)
print('----------------------')
[print(duration) for duration in timed.durations]
在我的计算机上,输出如下:
async_aiohttp_get_all started
async_aiohttp_get_all finished in 3.20 seconds
async_requests_get_all started
async_requests_get_all finished in 30.61 seconds
sync_requests_get_all started
sync_requests_get_all finished in 30.59 seconds
----------------------
async_aiohttp_get_all finished in 3.20 seconds
async_requests_get_all finished in 30.61 seconds
sync_requests_get_all finished in 30.59 seconds
解决方案 4:
我测试了requests-futures和grequests。Grequests速度更快,但带来了monkey patching和其他依赖项问题。requests-futures比grequests慢几倍。我决定自己写一个,简单地将请求包装到ThreadPoolExecutor中,它几乎和grequests一样快,但没有外部依赖项。
import requests
import concurrent.futures
def get_urls():
return ["url1","url2"]
def load_url(url, timeout):
return requests.get(url, timeout = timeout)
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
future_to_url = {executor.submit(load_url, url, 10): url for url in get_urls()}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
resp_err = resp_err + 1
else:
resp_ok = resp_ok + 1
解决方案 5:
也许request-futures是另一种选择。
from requests_futures.sessions import FuturesSession
session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)
在office文档中也有推荐使用。如果你不想用gevent的话,这是一个不错的选择。
解决方案 6:
您可以用它来实现httpx
这一点。
import httpx
async def get_async(url):
async with httpx.AsyncClient() as client:
return await client.get(url)
urls = ["http://google.com", "http://wikipedia.org"]
# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))
如果您想要一个功能语法,gamla lib 会将其包装到get_async
。
然后你可以做
await gamla.map(gamla.get_async(10))(["http://google.com", "http://wikipedia.org"])
这10
是以秒为单位的超时时间。
(免责声明:我是其作者)
解决方案 7:
我对发布的大多数答案都有很多疑问 - 它们要么使用已移植的弃用库,功能有限,要么提供的解决方案在执行请求时有太多魔力,导致难以处理错误。如果它们不属于上述类别之一,则它们是第三方库或已弃用。
有些解决方案在纯 http 请求中运行良好,但对于其他类型的请求则无法满足要求,这很荒谬。这里不需要高度定制的解决方案。
只需使用 python 内置库asyncio
就足以执行任何类型的异步请求,并为复杂和特定用例的错误处理提供足够的流动性。
import asyncio
loop = asyncio.get_event_loop()
def do_thing(params):
async def get_rpc_info_and_do_chores(id):
# do things
response = perform_grpc_call(id)
do_chores(response)
async def get_httpapi_info_and_do_chores(id):
# do things
response = requests.get(URL)
do_chores(response)
async_tasks = []
for element in list(params.list_of_things):
async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))
loop.run_until_complete(asyncio.gather(*async_tasks))
它的工作原理很简单。你创建一系列希望异步执行的任务,然后要求循环执行这些任务并在完成后退出。无需额外的库来避免维护不足,也无需缺少功能。
解决方案 8:
我知道这已经关闭了一段时间,但我认为推广基于请求库构建的另一个异步解决方案可能会很有用。
list_of_requests = ['http://moop.com', 'http://doop.com', ...]
from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
print response.content
文档在这里:http://pythonhosted.org/simple-requests/
解决方案 9:
免责声明:以下代码为每个函数创建不同的线程。
在某些情况下,这可能很有用,因为它使用起来更简单。但请注意,它不是异步的,而是给人一种使用多个线程异步的错觉,尽管装饰器建议这样做。
您可以使用下面的装饰器在函数执行完成后给出一个回调,回调必须处理函数返回的数据。
请注意,函数被修饰后它将返回一个Future
对象。
import asyncio
## Decorator implementation of async runner !!
def run_async(callback, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
def inner(func):
def wrapper(*args, **kwargs):
def __exec():
out = func(*args, **kwargs)
callback(out)
return out
return loop.run_in_executor(None, __exec)
return wrapper
return inner
实现示例:
urls = ["https://google.com", "https://facebook.com", "https://apple.com", "https://netflix.com"]
loaded_urls = [] # OPTIONAL, used for showing realtime, which urls are loaded !!
def _callback(resp):
print(resp.url)
print(resp)
loaded_urls.append((resp.url, resp)) # OPTIONAL, used for showing realtime, which urls are loaded !!
# Must provide a callback function, callback func will be executed after the func completes execution
# Callback function will accept the value returned by the function.
@run_async(_callback)
def get(url):
return requests.get(url)
for url in urls:
get(url)
如果你希望实时查看哪些 URL 被加载,那么你也可以在末尾添加以下代码:
while True:
print(loaded_urls)
if len(loaded_urls) == len(urls):
break
解决方案 10:
from threading import Thread
threads=list()
for requestURI in requests:
t = Thread(target=self.openURL, args=(requestURI,))
t.start()
threads.append(t)
for thread in threads:
thread.join()
...
def openURL(self, requestURI):
o = urllib2.urlopen(requestURI, timeout = 600)
o...
解决方案 11:
我赞同上述使用HTTPX 的建议,但我经常以不同的方式使用它,因此我添加了我的答案。
我个人使用asyncio.run
(在 Python 3.7 中引入)而不是asyncio.gather
并且更喜欢该aiostream
方法,它可以与 asyncio 和 httpx 结合使用。
正如我刚刚发布的这个示例一样,这种风格有助于异步处理一组 URL,即使出现(常见)错误也是如此。我特别喜欢这种风格如何明确响应处理发生的位置,以及简化错误处理(我发现异步调用往往会提供更多错误处理)。
发布一个异步触发一堆请求的简单示例更容易,但通常您还想处理响应内容(使用它来计算某些内容,可能参考您请求的 URL 所涉及的原始对象)。
该方法的核心如下:
async with httpx.AsyncClient(timeout=timeout) as session:
ws = stream.repeat(session)
xs = stream.zip(ws, stream.iterate(urls))
ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
zs = stream.map(ys, process)
return await zs
在哪里:
process_thing
是一个异步响应内容处理函数things
是输入列表(urls
URL 字符串生成器的来源),例如对象/字典列表pbar
是一个进度条(例如tqdm.tqdm
)[可选但有用]
所有这些都进入一个异步函数async_fetch_urlset
,然后通过调用名为 eg 的同步“顶级”函数来运行fetch_things
,该函数运行协程[这是异步函数返回的内容]并管理事件循环:
def fetch_things(urls, things, pbar=None, verbose=False):
return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))
由于作为输入传递的列表(这里是things
)可以就地修改,因此您可以有效地获取输出(就像我们从同步函数调用中习惯的那样)
解决方案 12:
上述答案都对我没有帮助,因为他们假设你有一个预定义的请求列表,而就我而言,我需要能够监听请求并异步响应(类似于它在 nodejs 中的工作方式)。
def handle_finished_request(r, **kwargs):
print(r)
# while True:
def main():
while True:
address = listen_to_new_msg() # based on your server
# schedule async requests and run 'handle_finished_request' on response
req = grequests.get(address, timeout=1, hooks=dict(response=handle_finished_request))
job = grequests.send(req) # does not block! for more info see https://stackoverflow.com/a/16016635/10577976
main()
handle_finished_request
当收到响应时,将调用回调。注意:由于某种原因超时(或无响应)不会在此处触发错误。
这个简单的循环可以触发异步请求,类似于在 nodejs 服务器中的工作方式
解决方案 13:
我强烈推荐 hyper_requests ( https://github.com/edjones84/hyper-requests ),它可以生成 URL 和参数列表,然后异步运行请求:
import hyper_requests
# Define the request parameters
params = [
{'url': 'http://httpbin.org/get' , 'data': 'value1'},
{'url': 'http://httpbin.org/get' , 'data': 'value3'},
{'url': 'http://httpbin.org/get' , 'data': 'value5'},
{'url': 'http://httpbin.org/get' , 'data': 'value7'},
{'url': 'http://httpbin.org/get' , 'data': 'value9'}
]
# Create an instance of AsyncRequests and execute the requests
returned_data = hyper_requests.get(request_params=params, workers=10)
# Process the returned data
for response in returned_data:
print(response)