使用 Python 请求进行异步请求

2024-12-13 08:36:00
admin
原创
127
摘要:问题描述:我尝试了python请求库文档中提供的示例。使用async.map(rs),我得到了响应代码,但我想获取每个请求页面的内容。例如,这不起作用:out = async.map(rs) print out[0].content 解决方案 1:笔记以下答案不适用于请求 v0.13.0+。在写完这个问题后,...

问题描述:

我尝试了python请求库文档中提供的示例。

使用async.map(rs),我得到了响应代码,但我想获取每个请求页面的内容。例如,这不起作用:

out = async.map(rs)
print out[0].content

解决方案 1:

笔记

以下答案不适用于请求 v0.13.0+。在写完这个问题后,异步功能被移至grequestsrequests 。但是,您只需用grequests下面的替换它就可以了。

我保留这个答案是为了反映关于使用请求<v0.13.0的原始问题。


async.map 要异步执行多个任务,您必须:

  1. 定义一个函数来执行你想对每个对象执行的操作(你的任务)

  2. 在您的请求中添加该函数作为事件挂钩

  3. 调用async.map所有请求/操作的列表

例子:

from requests import async
# If using requests > v0.13.0, use
# from grequests import async

urls = [
    'http://python-requests.org',
    'http://httpbin.org',
    'http://python-guide.org',
    'http://kennethreitz.com'
]

# A simple task to do to each response object
def do_something(response):
    print response.url

# A list to hold our things to do via async
async_list = []

for u in urls:
    # The "hooks = {..." part is where you define what you want to do
    # 
    # Note the lack of parentheses following do_something, this is
    # because the response will be used as the first argument automatically
    action_item = async.get(u, hooks = {'response' : do_something})

    # Add the task to our list of things to do via async
    async_list.append(action_item)

# Do our list of things to do via async
async.map(async_list)

解决方案 2:

async现在 是 一个 独立 的 模块 : grequests

请参阅此处:https://github.com/spyoungtech/grequests

还有:通过 Python 发送多个 HTTP 请求的理想方法?

安装:

$ pip install grequests

用法:

构建堆栈:

import grequests

urls = [
    'http://www.heroku.com',
    'http://tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://kennethreitz.com'
]

rs = (grequests.get(u) for u in urls)

发送堆栈

grequests.map(rs)

结果看起来像

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

grequests 似乎没有对并发请求设置限制,即当多个请求发送到同一台服务器时。

解决方案 3:

不幸的是,据我所知,请求库不具备执行异步请求的功能。你可以使用async/await语法包装requests,但这不会使底层请求变得不那么同步。如果你想要真正的异步请求,你必须使用提供它的其他工具。一个这样的解决方案是aiohttp(Python 3.5.3+)。根据我的经验,它与 Python 3.7 语法一起使用效果很好async/await。下面我写了三种使用执行 n 个 Web 请求的实现

  1. sync_requests_get_all使用 Pythonrequests库的纯同步请求( )

  2. 使用Python 3.7 语法包装的async_requests_get_allPython库进行同步请求( )和requests`async/await`asyncio

  3. 一个真正的异步实现(async_aiohttp_get_all),其中的 Pythonaiohttp库以 Python 3.7async/await语法包装,并且asyncio

"""
Tested in Python 3.5.10
"""

import time
import asyncio
import requests
import aiohttp

from asgiref import sync

def timed(func):
    """
    records approximate durations of function calls
    """
    def wrapper(*args, **kwargs):
        start = time.time()
        print('{name:<30} started'.format(name=func.__name__))
        result = func(*args, **kwargs)
        duration = "{name:<30} finished in {elapsed:.2f} seconds".format(
            name=func.__name__, elapsed=time.time() - start
        )
        print(duration)
        timed.durations.append(duration)
        return result
    return wrapper

timed.durations = []


@timed
def sync_requests_get_all(urls):
    """
    performs synchronous get requests
    """
    # use session to reduce network overhead
    session = requests.Session()
    return [session.get(url).json() for url in urls]


@timed
def async_requests_get_all(urls):
    """
    asynchronous wrapper around synchronous requests
    """
    session = requests.Session()
    # wrap requests.get into an async function
    def get(url):
        return session.get(url).json()
    async_get = sync.sync_to_async(get)

    async def get_all(urls):
        return await asyncio.gather(*[
            async_get(url) for url in urls
        ])
    # call get_all as a sync function to be used in a sync context
    return sync.async_to_sync(get_all)(urls)

@timed
def async_aiohttp_get_all(urls):
    """
    performs asynchronous get requests
    """
    async def get_all(urls):
        async with aiohttp.ClientSession() as session:
            async def fetch(url):
                async with session.get(url) as response:
                    return await response.json()
            return await asyncio.gather(*[
                fetch(url) for url in urls
            ])
    # call get_all as a sync function to be used in a sync context
    return sync.async_to_sync(get_all)(urls)


if __name__ == '__main__':
    # this endpoint takes ~3 seconds to respond,
    # so a purely synchronous implementation should take
    # little more than 30 seconds and a purely asynchronous
    # implementation should take little more than 3 seconds.
    urls = ['https://postman-echo.com/delay/3']*10

    async_aiohttp_get_all(urls)
    async_requests_get_all(urls)
    sync_requests_get_all(urls)
    print('----------------------')
    [print(duration) for duration in timed.durations]

在我的计算机上,输出如下:

async_aiohttp_get_all          started
async_aiohttp_get_all          finished in 3.20 seconds
async_requests_get_all         started
async_requests_get_all         finished in 30.61 seconds
sync_requests_get_all          started
sync_requests_get_all          finished in 30.59 seconds
----------------------
async_aiohttp_get_all          finished in 3.20 seconds
async_requests_get_all         finished in 30.61 seconds
sync_requests_get_all          finished in 30.59 seconds

解决方案 4:

我测试了requests-futures和grequests。Grequests速度更快,但带来了monkey patching和其他依赖项问题。requests-futures比grequests慢几倍。我决定自己写一个,简单地将请求包装到ThreadPoolExecutor中,它几乎和grequests一样快,但没有外部依赖项。

import requests
import concurrent.futures

def get_urls():
    return ["url1","url2"]

def load_url(url, timeout):
    return requests.get(url, timeout = timeout)

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

    future_to_url = {executor.submit(load_url, url, 10): url for url in     get_urls()}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            resp_err = resp_err + 1
        else:
            resp_ok = resp_ok + 1

解决方案 5:

也许request-futures是另一种选择。

from requests_futures.sessions import FuturesSession

session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)

在office文档中也有推荐使用。如果你不想用gevent的话,这是一个不错的选择。

解决方案 6:

您可以用它来实现httpx这一点。

import httpx

async def get_async(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

urls = ["http://google.com", "http://wikipedia.org"]

# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))

如果您想要一个功能语法,gamla lib 会将其包装到get_async

然后你可以做


await gamla.map(gamla.get_async(10))(["http://google.com", "http://wikipedia.org"])

10是以秒为单位的超时时间。

(免责声明:我是其作者)

解决方案 7:

我对发布的大多数答案都有很多疑问 - 它们要么使用已移植的弃用库,功能有限,要么提供的解决方案在执行请求时有太多魔力,导致难以处理错误。如果它们不属于上述类别之一,则它们是第三方库或已弃用。

有些解决方案在纯 http 请求中运行良好,但对于其他类型的请求则无法满足要求,这很荒谬。这里不需要高度定制的解决方案。

只需使用 python 内置库asyncio就足以执行任何类型的异步请求,并为复杂和特定用例的错误处理提供足够的流动性。

import asyncio

loop = asyncio.get_event_loop()

def do_thing(params):
    async def get_rpc_info_and_do_chores(id):
        # do things
        response = perform_grpc_call(id)
        do_chores(response)

    async def get_httpapi_info_and_do_chores(id):
        # do things
        response = requests.get(URL)
        do_chores(response)

    async_tasks = []
    for element in list(params.list_of_things):
       async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
       async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))

    loop.run_until_complete(asyncio.gather(*async_tasks))

它的工作原理很简单。你创建一系列希望异步执行的任务,然后要求循环执行这些任务并在完成后退出。无需额外的库来避免维护不足,也无需缺少功能。

解决方案 8:

我知道这已经关闭了一段时间,但我认为推广基于请求库构建的另一个异步解决方案可能会很有用。

list_of_requests = ['http://moop.com', 'http://doop.com', ...]

from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
    print response.content

文档在这里:http://pythonhosted.org/simple-requests/

解决方案 9:

免责声明:以下代码为每个函数创建不同的线程

在某些情况下,这可能很有用,因为它使用起来更简单。但请注意,它不是异步的,而是给人一种使用多个线程异步的错觉,尽管装饰器建议这样做。

您可以使用下面的装饰器在函数执行完成后给出一个回调,回调必须处理函数返回的数据。

请注意,函数被修饰后它将返回一个Future对象。

import asyncio

## Decorator implementation of async runner !!
def run_async(callback, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    def inner(func):
        def wrapper(*args, **kwargs):
            def __exec():
                out = func(*args, **kwargs)
                callback(out)
                return out

            return loop.run_in_executor(None, __exec)

        return wrapper

    return inner

实现示例:

urls = ["https://google.com", "https://facebook.com", "https://apple.com", "https://netflix.com"]
loaded_urls = []  # OPTIONAL, used for showing realtime, which urls are loaded !!


def _callback(resp):
    print(resp.url)
    print(resp)
    loaded_urls.append((resp.url, resp))  # OPTIONAL, used for showing realtime, which urls are loaded !!


# Must provide a callback function, callback func will be executed after the func completes execution
# Callback function will accept the value returned by the function.
@run_async(_callback)
def get(url):
    return requests.get(url)


for url in urls:
    get(url)

如果你希望实时查看哪些 URL 被加载,那么你也可以在末尾添加以下代码:

while True:
    print(loaded_urls)
    if len(loaded_urls) == len(urls):
        break

解决方案 10:

from threading import Thread

threads=list()

for requestURI in requests:
    t = Thread(target=self.openURL, args=(requestURI,))
    t.start()
    threads.append(t)

for thread in threads:
    thread.join()

...

def openURL(self, requestURI):
    o = urllib2.urlopen(requestURI, timeout = 600)
    o...

解决方案 11:

我赞同上述使用HTTPX 的建议,但我经常以不同的方式使用它,因此我添加了我的答案。

我个人使用asyncio.run(在 Python 3.7 中引入)而不是asyncio.gather并且更喜欢该aiostream方法,它可以与 asyncio 和 httpx 结合使用。

正如我刚刚发布的这个示例一样,这种风格有助于异步处理一组 URL,即使出现(常见)错误也是如此。我特别喜欢这种风格如何明确响应处理发生的位置,以及简化错误处理(我发现异步调用往往会提供更多错误处理)。

发布一个异步触发一堆请求的简单示例更容易,但通常您还想处理响应内容(使用它来计算某些内容,可能参考您请求的 URL 所涉及的原始对象)。

该方法的核心如下:

async with httpx.AsyncClient(timeout=timeout) as session:
    ws = stream.repeat(session)
    xs = stream.zip(ws, stream.iterate(urls))
    ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
    process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
    zs = stream.map(ys, process)
    return await zs

在哪里:

  • process_thing是一个异步响应内容处理函数

  • things是输入列表(urlsURL 字符串生成器的来源),例如对象/字典列表

  • pbar是一个进度条(例如tqdm.tqdm)[可选但有用]

所有这些都进入一个异步函数async_fetch_urlset,然后通过调用名为 eg 的同步“顶级”函数来运行fetch_things,该函数运行协程[这是异步函数返回的内容]并管理事件循环:

def fetch_things(urls, things, pbar=None, verbose=False):
    return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))

由于作为输入传递的列表(这里是things)可以就地修改,因此您可以有效地获取输出(就像我们从同步函数调用中习惯的那样)

解决方案 12:

上述答案都对我没有帮助,因为他们假设你有一个预定义的请求列表,而就我而言,我需要能够监听请求并异步响应(类似于它在 nodejs 中的工作方式)。

def handle_finished_request(r, **kwargs):
    print(r)


# while True:
def main():
    while True:
        address = listen_to_new_msg()  # based on your server

        # schedule async requests and run 'handle_finished_request' on response
        req = grequests.get(address, timeout=1, hooks=dict(response=handle_finished_request))
        job = grequests.send(req)  # does not block! for more info see https://stackoverflow.com/a/16016635/10577976


main()

handle_finished_request当收到响应时,将调用回调。注意:由于某种原因超时(或无响应)不会在此处触发错误。

这个简单的循环可以触发异步请求,类似于在 nodejs 服务器中的工作方式

解决方案 13:

我强烈推荐 hyper_requests ( https://github.com/edjones84/hyper-requests ),它可以生成 URL 和参数列表,然后异步运行请求:

import hyper_requests

# Define the request parameters
params = [
    {'url': 'http://httpbin.org/get' , 'data': 'value1'},
    {'url': 'http://httpbin.org/get' , 'data': 'value3'},
    {'url': 'http://httpbin.org/get' , 'data': 'value5'},
    {'url': 'http://httpbin.org/get' , 'data': 'value7'},
    {'url': 'http://httpbin.org/get' , 'data': 'value9'}
]

# Create an instance of AsyncRequests and execute the requests
returned_data = hyper_requests.get(request_params=params, workers=10)

# Process the returned data
for response in returned_data:
    print(response)
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1579  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1355  
  信创产品在政府采购中的占比分析随着信息技术的飞速发展以及国家对信息安全重视程度的不断提高,信创产业应运而生并迅速崛起。信创,即信息技术应用创新,旨在实现信息技术领域的自主可控,减少对国外技术的依赖,保障国家信息安全。政府采购作为推动信创产业发展的重要力量,其对信创产品的采购占比情况备受关注。这不仅关系到信创产业的发展前...
信创和国产化的区别   8  
  信创,即信息技术应用创新产业,旨在实现信息技术领域的自主可控,摆脱对国外技术的依赖。近年来,国货国用信创发展势头迅猛,在诸多领域取得了显著成果。这一发展趋势对科技创新产生了深远的推动作用,不仅提升了我国在信息技术领域的自主创新能力,还为经济社会的数字化转型提供了坚实支撑。信创推动核心技术突破信创产业的发展促使企业和科研...
信创工作   9  
  信创技术,即信息技术应用创新产业,旨在实现信息技术领域的自主可控与安全可靠。近年来,信创技术发展迅猛,对中小企业产生了深远的影响,带来了诸多不可忽视的价值。在数字化转型的浪潮中,中小企业面临着激烈的市场竞争和复杂多变的环境,信创技术的出现为它们提供了新的发展机遇和支撑。信创技术对中小企业的影响技术架构变革信创技术促使中...
信创国产化   8  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用