如何获取线程的返回值?
- 2024-11-27 10:43:00
- admin 原创
- 141
问题描述:
下面的函数foo
返回一个字符串'foo'
。如何获取'foo'
从线程目标返回的值?
from threading import Thread
def foo(bar):
print('hello {}'.format(bar))
return 'foo'
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()
上面显示的“一种明显的方法”不起作用:thread.join()
返回None
。
解决方案 1:
我见过的一种方法是将可变对象(例如列表或字典)连同索引或其他某种标识符一起传递给线程的构造函数。然后线程可以将其结果存储在该对象的专用槽中。例如:
def foo(bar, result, index):
print 'hello {0}'.format(bar)
result[index] = "foo"
from threading import Thread
threads = [None] * 10
results = [None] * 10
for i in range(len(threads)):
threads[i] = Thread(target=foo, args=('world!', results, i))
threads[i].start()
# do some other stuff
for i in range(len(threads)):
threads[i].join()
print " ".join(results) # what sound does a metasyntactic locomotive make?
如果您确实想join()
返回被调用函数的返回值,您可以使用Thread
如下所示的子类来执行此操作:
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print twrv.join() # prints foo
由于一些名称混乱,这变得有点复杂,并且它访问特定于Thread
实现的“私有”数据结构......但它有效。
对于 Python 3:
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args,
**self._kwargs)
def join(self, *args):
Thread.join(self, *args)
return self._return
解决方案 2:
值得一提的是,multiprocessing
模块使用类提供了一个很好的接口Pool
。如果您想要坚持使用线程而不是进程,那么您可以使用类multiprocessing.pool.ThreadPool
作为替代品。
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
# do some other stuff in the main process
return_val = async_result.get() # get the return value from your function.
解决方案 3:
在 Python 3.2+ 中,stdlibconcurrent.futures
模块为 提供了更高级别的 API threading
,包括将工作线程的返回值或异常传递回主线程。您可以在实例上调用该result()
方法Future
,它将等到线程完成后才返回线程函数的结果值。
import concurrent.futures
def foo(bar):
print('hello {}'.format(bar))
return 'foo'
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(foo, 'world!')
return_value = future.result()
print(return_value)
解决方案 4:
Jake 的回答很好,但如果你不想使用线程池(你不知道需要多少个线程,但根据需要创建它们),那么在线程之间传输信息的一个好方法是内置的Queue.Queue类,因为它提供了线程安全性。
我创建了以下装饰器,使其以类似于线程池的方式运行:
def threaded(f, daemon=False):
import Queue
def wrapped_f(q, *args, **kwargs):
'''this function calls the decorated function and puts the
result in a queue'''
ret = f(*args, **kwargs)
q.put(ret)
def wrap(*args, **kwargs):
'''this is the function returned from the decorator. It fires off
wrapped_f in a new thread and returns the thread object with
the result queue attached'''
q = Queue.Queue()
t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
t.daemon = daemon
t.start()
t.result_queue = q
return t
return wrap
然后你就可以用它作为:
@threaded
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Thread object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result_queue.get()
print result
每次调用装饰函数时都会创建一个新线程,并返回一个包含将接收结果的队列的 Thread 对象。
更新
自从我发布这个答案已经有一段时间了,但是它仍然得到了浏览量,所以我想我会更新它以反映我在新版本的 Python 中执行此操作的方式:
Python 3.2 添加了模块concurrent.futures
,它为并行任务提供了高级接口。它提供了ThreadPoolExecutor
和ProcessPoolExecutor
,因此您可以使用具有相同 API 的线程或进程池。
这个 API 的一个好处是,提交一个任务并Executor
返回一个Future
对象,该对象将使用您提交的可调用函数的返回值完成。
这使得附加queue
对象变得没有必要,从而大大简化了装饰器:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)
return wrap
如果没有传入,则将使用默认模块线程池执行器。
用法和以前非常相似:
@threadpool
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Future object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result()
print result
如果你使用的是 Python 3.4+,那么使用此方法(以及一般的 Future 对象)的一个非常好的功能是,返回的 Future 可以被包装成asyncio.Future
with asyncio.wrap_future
。这使得它可以轻松地与协程配合使用:
result = await asyncio.wrap_future(long_task(10))
如果不需要访问底层concurrent.Future
对象,则可以将包装包含在装饰器中:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))
return wrap
然后,每当你需要将 CPU 密集型或阻塞代码从事件循环线程中推出时,你可以将其放在一个装饰函数中:
@threadpool
def some_long_calculation():
...
# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
解决方案 5:
另一种不需要更改现有代码的解决方案:
import Queue # Python 2.x
#from queue import Queue # Python 3.x
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar) # Python 2.x
#print('hello {0}'.format(bar)) # Python 3.x
return 'foo'
que = Queue.Queue() # Python 2.x
#que = Queue() # Python 3.x
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result # Python 2.x
#print(result) # Python 3.x
它还可以轻松调整到多线程环境:
import Queue # Python 2.x
#from queue import Queue # Python 3.x
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar) # Python 2.x
#print('hello {0}'.format(bar)) # Python 3.x
return 'foo'
que = Queue.Queue() # Python 2.x
#que = Queue() # Python 3.x
threads_list = list()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)
# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...
# Join all the threads
for t in threads_list:
t.join()
# Check thread's return value
while not que.empty():
result = que.get()
print result # Python 2.x
#print(result) # Python 3.x
解决方案 6:
更新:
我认为有一种更简单、更简洁的方法来保存线程的结果,并且使接口与类几乎相同threading.Thread
(如果有特殊情况,请告诉我 - 我没有像下面的原始帖子那样进行过大量测试):
import threading
class ConciseResult(threading.Thread):
def run(self):
self.result = self._target(*self._args, **self._kwargs)
为了保持稳健并避免潜在的错误:
import threading
class ConciseRobustResult(threading.Thread):
def run(self):
try:
if self._target is not None:
self.result = self._target(*self._args, **self._kwargs)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs
简短解释:我们仅run
重写的方法,threading.Thread
而不修改任何其他内容。这使我们能够使用该类threading.Thread
为我们执行的所有其他操作,而无需担心遗漏潜在的边缘情况,例如_private
属性分配或自定义属性修改(如我的原始帖子中所述)。
通过查看和的run
输出,我们可以验证我们只修改了方法。下面包含的唯一方法/属性/描述符是,其他所有内容都来自继承的基类(请参阅部分)。help(ConciseResult)
`help(ConciseRobustResult)Methods defined here:
runthreading.Thread
Methods inherited from threading.Thread:`
要使用下面的示例代码测试这些实现中的任何一个,请在下面的函数中替换ConciseResult
或。ConciseRobustResult
`ThreadWithResult`main
原始帖子在方法中使用闭包函数init
:
我发现大多数答案都很长,并且需要熟悉其他模块或高级 Python 特性,除非他们已经熟悉答案所讨论的所有内容,否则他们会感到困惑。
简化方法的工作代码:
import threading
class ThreadWithResult(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
def function():
self.result = target(*args, **kwargs)
super().__init__(group=group, target=function, name=name, daemon=daemon)
示例代码:
import time, random
def function_to_thread(n):
count = 0
while count < 3:
print(f'still running thread {n}')
count +=1
time.sleep(3)
result = random.random()
print(f'Return value of thread {n} should be: {result}')
return result
def main():
thread1 = ThreadWithResult(target=function_to_thread, args=(1,))
thread2 = ThreadWithResult(target=function_to_thread, args=(2,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(thread1.result)
print(thread2.result)
main()
解释:
我想大大简化事情,所以我创建了一个ThreadWithResult
类并让它从继承。中的threading.Thread
嵌套函数调用我们想要保存其值的线程函数,并在线程执行完成后将该嵌套函数的结果保存为实例属性。function
`__init__`self.result
创建此实例与创建实例相同threading.Thread
。将要在新线程上运行的函数传递给target
参数,将函数可能需要的任何参数传递给args
参数,将任何关键字参数传递给kwargs
参数。
例如
my_thread = ThreadWithResult(target=my_function, args=(arg1, arg2, arg3))
我认为这比绝大多数答案更容易理解,而且这种方法不需要额外的导入!我包含了time
和random
模块来模拟线程的行为,但它们并不是实现原始问题中要求的功能所必需的。
我知道我在问题提出后很久才回答这个问题,但我希望这可以在将来帮助更多的人!
编辑:我创建了save-thread-result
PyPI 包,以便您可以访问上述相同的代码并在项目之间重复使用它(GitHub 代码在这里)。PyPI 包完全扩展了该类,因此您也可以设置要在类上threading.Thread
设置的任何属性!threading.thread
`ThreadWithResult`
上面的原始答案讨论了该子类背后的主要思想,但有关更多信息,请参见此处更详细的解释(来自模块文档字符串)。
快速使用示例:
pip3 install -U save-thread-result # MacOS/Linux
pip install -U save-thread-result # Windows
python3 # MacOS/Linux
python # Windows
from save_thread_result import ThreadWithResult
# As of Release 0.0.3, you can also specify values for
#`group`, `name`, and `daemon` if you want to set those
# values manually.
thread = ThreadWithResult(
target = my_function,
args = (my_function_arg1, my_function_arg2, ...)
kwargs = {my_function_kwarg1: kwarg1_value, my_function_kwarg2: kwarg2_value, ...}
)
thread.start()
thread.join()
if getattr(thread, 'result', None):
print(thread.result)
else:
# thread.result attribute not set - something caused
# the thread to terminate BEFORE the thread finished
# executing the function passed in through the
# `target` argument
print('ERROR! Something went wrong while executing this thread, and the function you passed in did NOT complete!!')
# seeing help about the class and information about the threading.Thread super class methods and attributes available:
help(ThreadWithResult)
解决方案 7:
Parris / kindall 的答案 join
/return
答案移植到 Python 3:
from threading import Thread
def foo(bar):
print('hello {0}'.format(bar))
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print(twrv.join()) # prints foo
请注意,Thread
该类在 Python 3 中的实现有所不同。
解决方案 8:
我偷了 kindall 的答案并稍微清理了一下。
关键部分是将 args 和 *kwargs 添加到 join() 以处理超时
class threadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super(threadWithReturn, self).__init__(*args, **kwargs)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
def join(self, *args, **kwargs):
super(threadWithReturn, self).join(*args, **kwargs)
return self._return
以下是更新后的答案
这是我获得最多赞同的答案,所以我决定用可以在 py2 和 py3 上运行的代码进行更新。
此外,我看到很多对这个问题的回答都表明对 Thread.join() 缺乏理解。有些答案完全无法处理timeout
参数。但是,还有一个极端情况,您应该注意以下情况:(1) 目标函数可以返回None
,(2) 您还将参数传递timeout
给 join()。请参阅“测试 4”以了解这个极端情况。
与 py2 和 py3 一起使用的 ThreadWithReturn 类:
import sys
from threading import Thread
from builtins import super # https://stackoverflow.com/a/30159479
_thread_target_key, _thread_args_key, _thread_kwargs_key = (
('_target', '_args', '_kwargs')
if sys.version_info >= (3, 0) else
('_Thread__target', '_Thread__args', '_Thread__kwargs')
)
class ThreadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._return = None
def run(self):
target = getattr(self, _thread_target_key)
if target is not None:
self._return = target(
*getattr(self, _thread_args_key),
**getattr(self, _thread_kwargs_key)
)
def join(self, *args, **kwargs):
super().join(*args, **kwargs)
return self._return
一些示例测试如下所示:
import time, random
# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
if not seconds is None:
time.sleep(seconds)
return arg
# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')
# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)
# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished
# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
您能指出我们在测试 4 中可能遇到的极端情况吗?
问题是我们期望 giveMe() 返回 None (参见测试 2),但我们也期望 join() 在超时时返回 None。
returned is None
指:
(1)这就是 giveMe() 返回的结果,或者
(2)join()超时
这个例子很简单,因为我们知道 giveMe() 总是会返回 None。但在现实世界中(目标可能合法地返回 None 或其他内容),我们想要明确检查发生了什么。
下面说明了如何解决这个特殊情况:
# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
if my_thread.isAlive():
# returned is None because join() timed out
# this also means that giveMe() is still running in the background
pass
# handle this based on your app's logic
else:
# join() is finished, and so is giveMe()
# BUT we could also be in a race condition, so we need to update returned, just in case
returned = my_thread.join()
解决方案 9:
使用队列:
import threading, queue
def calc_square(num, out_queue1):
l = []
for x in num:
l.append(x*x)
out_queue1.put(l)
arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
解决方案 10:
我发现最短、最简单的方法是利用 Python 类及其动态属性。您可以使用 从生成的线程上下文中检索当前线程threading.current_thread()
,并将返回值分配给属性。
import threading
def some_target_function():
# Your code here.
threading.current_thread().return_value = "Some return value."
your_thread = threading.Thread(target=some_target_function)
your_thread.start()
your_thread.join()
return_value = your_thread.return_value
print(return_value)
解决方案 11:
我的解决方案是将函数和线程包装在一个类中。不需要使用池、队列或 C 类型变量传递。它也是非阻塞的。您可以检查状态。请参阅代码末尾如何使用它的示例。
import threading
class ThreadWorker():
'''
The basic idea is given a function create an object.
The object can then run the function in a thread.
It provides a wrapper to start it,check its status,and get data out the function.
'''
def __init__(self,func):
self.thread = None
self.data = None
self.func = self.save_data(func)
def save_data(self,func):
'''modify function to save its returned data'''
def new_func(*args, **kwargs):
self.data=func(*args, **kwargs)
return new_func
def start(self,params):
self.data = None
if self.thread is not None:
if self.thread.isAlive():
return 'running' #could raise exception here
#unless thread exists and is alive start or restart it
self.thread = threading.Thread(target=self.func,args=params)
self.thread.start()
return 'started'
def status(self):
if self.thread is None:
return 'not_started'
else:
if self.thread.isAlive():
return 'running'
else:
return 'finished'
def get_results(self):
if self.thread is None:
return 'not_started' #could return exception
else:
if self.thread.isAlive():
return 'running'
else:
return self.data
def add(x,y):
return x +y
add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
解决方案 12:
考虑到@iman对@JakeBiesinger回答的评论,我将其重新组合为具有不同数量的线程:
from multiprocessing.pool import ThreadPool
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
numOfThreads = 3
results = []
pool = ThreadPool(numOfThreads)
for i in range(0, numOfThreads):
results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)
# do some other stuff in the main process
# ...
# ...
results = [r.get() for r in results]
print results
pool.close()
pool.join()
解决方案 13:
我正在使用这个包装器,它可以轻松地将任何函数转换为在Thread
- 中运行,并处理其返回值或异常。它不会增加Queue
开销。
def threading_func(f):
"""Decorator for running a function in a thread and handling its return
value or exception"""
def start(*args, **kw):
def run():
try:
th.ret = f(*args, **kw)
except:
th.exc = sys.exc_info()
def get(timeout=None):
th.join(timeout)
if th.exc:
raise th.exc[0], th.exc[1], th.exc[2] # py2
##raise th.exc[1] #py3
return th.ret
th = threading.Thread(None, run)
th.exc = None
th.get = get
th.start()
return th
return start
使用示例
def f(x):
return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))
@threading_func
def th_mul(a, b):
return a * b
th = th_mul("text", 2.5)
try:
print(th.get())
except TypeError:
print("exception thrown ok.")
threading
模块说明
线程函数的舒适返回值和异常处理是常见的“Pythonic”需求,模块确实应该已经提供threading
- 可能直接在标准Thread
类中提供。ThreadPool
对于简单任务来说,开销太大 - 3 个管理线程,很多官僚作风。不幸的是,Thread
布局最初是从 Java 复制而来 - 例如,您可以从仍然无用的第一个(!)构造函数参数中看到group
。
解决方案 14:
根据 kindall 所提到的内容,这是适用于 Python3 的更通用的解决方案。
import threading
class ThreadWithReturnValue(threading.Thread):
def __init__(self, *init_args, **init_kwargs):
threading.Thread.__init__(self, *init_args, **init_kwargs)
self._return = None
def run(self):
self._return = self._target(*self._args, **self._kwargs)
def join(self):
threading.Thread.join(self)
return self._return
用法
th = ThreadWithReturnValue(target=requests.get, args=('http://www.google.com',))
th.start()
response = th.join()
response.status_code # => 200
解决方案 15:
join
总是返回None
,我认为你应该使用子类Thread
来处理返回代码等等。
解决方案 16:
您可以在线程函数的范围之上定义一个可变变量,并将结果添加到其中。(我还修改了代码以兼容 python3)
returns = {}
def foo(bar):
print('hello {0}'.format(bar))
returns[bar] = 'foo'
from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)
这将返回{'world!': 'foo'}
如果你使用函数输入作为结果字典的键,则每个唯一输入都保证在结果中给出一个条目
解决方案 17:
您可以使用pool.apply_async()
来ThreadPool()
返回值,test()
如下所示:
from multiprocessing.pool import ThreadPool
def test(num1, num2):
return num1 + num2
pool = ThreadPool(processes=1) # Here
result = pool.apply_async(test, (2, 3)) # Here
print(result.get()) # 5
并且,您还可以使用submit()
来concurrent.futures.ThreadPoolExecutor()
返回值,test()
如下所示:
from concurrent.futures import ThreadPoolExecutor
def test(num1, num2):
return num1 + num2
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test, 2, 3) # Here
print(future.result()) # 5
而且,return
您可以使用数组result
来代替,如下所示:
from threading import Thread
def test(num1, num2, r):
r[0] = num1 + num2 # Instead of "return"
result = [None] # Here
thread = Thread(target=test, args=(2, 3, result))
thread.start()
thread.join()
print(result[0]) # 5
另外return
,您还可以使用队列result
,如下所示:
from threading import Thread
import queue
def test(num1, num2, q):
q.put(num1 + num2) # Instead of "return"
queue = queue.Queue() # Here
thread = Thread(target=test, args=(2, 3, queue))
thread.start()
thread.join()
print(queue.get()) # '5'
解决方案 18:
这是一个相当老的问题,但我想分享一个对我有用并有助于我的开发过程的简单解决方案。
这个答案背后的方法论是,“新”目标函数通过所谓的闭包inner
将原始函数的结果(通过该__init__
函数传递)分配给result
包装器的实例属性。
这使得包装器类可以保留返回值以供调用者随时访问。
threading.Thread
注意:虽然尚未考虑yield函数(OP未提及yield函数),但此方法不需要使用任何混乱的方法或类的私有方法。
享受!
from threading import Thread as _Thread
class ThreadWrapper:
def __init__(self, target, *args, **kwargs):
self.result = None
self._target = self._build_threaded_fn(target)
self.thread = _Thread(
target=self._target,
*args,
**kwargs
)
def _build_threaded_fn(self, func):
def inner(*args, **kwargs):
self.result = func(*args, **kwargs)
return inner
此外,您可以运行pytest
以下代码(假设您已安装它)来演示结果:
import time
from commons import ThreadWrapper
def test():
def target():
time.sleep(1)
return 'Hello'
wrapper = ThreadWrapper(target=target)
wrapper.thread.start()
r = wrapper.result
assert r is None
time.sleep(2)
r = wrapper.result
assert r == 'Hello'
解决方案 19:
定义你的目标:
1)提出论点2)用以下方式q
替换任何陈述return foo
`q.put(foo); return`
所以一个函数
def func(a):
ans = a * a
return ans
将成为
def func(a, q):
ans = a * a
q.put(ans)
return
然后你就可以继续这样做
from Queue import Queue
from threading import Thread
ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]
threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]
您可以使用函数装饰器/包装器来实现这一点,这样您就可以使用现有的函数而不target
修改它们,但要遵循这个基本方案。
解决方案 20:
GuySoft 的想法很棒,但我认为对象不一定必须从 Thread 继承,并且可以从接口中删除 start():
from threading import Thread
import queue
class ThreadWithReturnValue(object):
def __init__(self, target=None, args=(), **kwargs):
self._que = queue.Queue()
self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
args=(self._que, args, kwargs), )
self._t.start()
def join(self):
self._t.join()
return self._que.get()
def foo(bar):
print('hello {0}'.format(bar))
return "foo"
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
print(twrv.join()) # prints foo
解决方案 21:
如上所述,多处理池比基本线程慢得多。使用队列(如这里的一些答案中所建议的)是一种非常有效的替代方法。我已将其与字典一起使用,以便能够运行大量小线程并通过将它们与字典组合来恢复多个答案:
#!/usr/bin/env python3
import threading
# use Queue for python2
import queue
import random
LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]
NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
def randoms(k, q):
result = dict()
result['letter'] = random.choice(LETTERS)
result['number'] = random.choice(NUMBERS)
q.put({k: result})
threads = list()
q = queue.Queue()
results = dict()
for name in ('alpha', 'oscar', 'yankee',):
threads.append( threading.Thread(target=randoms, args=(name, q)) )
threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
results.update(q.get())
print(results)
解决方案 22:
这是我根据@Kindall 的答案创建的版本。
此版本使得您只需输入带有参数的命令即可创建新线程。
这是用 Python 3.8 制作的:
from threading import Thread
from typing import Any
def test(plug, plug2, plug3):
print(f"hello {plug}")
print(f'I am the second plug : {plug2}')
print(plug3)
return 'I am the return Value!'
def test2(msg):
return f'I am from the second test: {msg}'
def test3():
print('hello world')
def NewThread(com, Returning: bool, *arguments) -> Any:
"""
Will create a new thread for a function/command.
:param com: Command to be Executed
:param arguments: Arguments to be sent to Command
:param Returning: True/False Will this command need to return anything
"""
class NewThreadWorker(Thread):
def __init__(self, group = None, target = None, name = None, args = (), kwargs = None, *,
daemon = None):
Thread.__init__(self, group, target, name, args, kwargs, daemon = daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
Thread.join(self)
return self._return
ntw = NewThreadWorker(target = com, args = (*arguments,))
ntw.start()
if Returning:
return ntw.join()
if __name__ == "__main__":
print(NewThread(test, True, 'hi', 'test', test2('hi')))
NewThread(test3, True)
解决方案 23:
感谢@alec-cureau。下面的代码对我来说运行良好。替换了弃用的 currentThread():
import threading
import time
def helo(name):
print("hello", name)
time.sleep(5)
threading.current_thread().return_value = name
return True
t1 = threading.Thread(target=helo, args=('bhargav',))
t2 = threading.Thread(target=helo, args=('kushal',))
t1.start()
t2.start()
print(t1.join())
print(t2.join())
print( "returned", t1.return_value)
print("returned", t2.return_value)
解决方案 24:
一个常见的解决方案是foo
使用装饰器包装你的函数
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
那么整个代码可能看起来像这样
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]
for t in threads:
t.start()
while(True):
if(len(threading.enumerate()) < max_num):
break
for t in threads:
t.join()
return result
笔记
一个重要的问题是返回值可能是无序的。(事实上,return value
不一定保存到queue
,因为您可以选择任意线程安全的数据结构)
解决方案 25:
Kindall在 Python3 中的回答
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon)
self._return = None
def run(self):
try:
if self._target:
self._return = self._target(*self._args, **self._kwargs)
finally:
del self._target, self._args, self._kwargs
def join(self,timeout=None):
Thread.join(self,timeout)
return self._return
解决方案 26:
我不知道这对你们是否有效,但我选择创建一个全局对象 [主要是字典或嵌套数组],这样函数就可以访问该对象并对其进行变异,我知道这需要更多资源,但我们不是在处理量子科学,所以,我想我们可以提供更多的内存,前提是内存消耗随 CPU 使用率线性增加。这是一个示例代码:
import requests
import json
import string
import random
import threading
import time
dictionary = {}
def get_val1(L):
print('#1')
for n,elem in enumerate(L):
dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val2(L):
print('#2')
for n,elem in enumerate(L):
dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val3(L):
print('#3')
for n,elem in enumerate(L):
dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val4(L):
print('#4')
for n,elem in enumerate(L):
dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
t1 = threading.Thread(target=get_val1,args=(L[0],))
t2 = threading.Thread(target=get_val2,args=(L[1],))
t3 = threading.Thread(target=get_val3,args=(L[2],))
t4 = threading.Thread(target=get_val4,args=(L[3],))
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
这个程序运行4个线程,每个线程返回一些文本L[i] for i in L的数据,API返回的数据存储在字典中,它是否有用可能因程序而异,对于小型到中型的计算任务,这个对象变异工作得非常快,并使用更多资源的百分之几。
解决方案 27:
忘记所有复杂的解决方案。
只需安装合适的软件包,例如:
https: //pypi.org/project/thread-with-results/
解决方案 28:
我知道这个帖子很老了......但我遇到了同样的问题......如果你愿意使用thread.join()
import threading
class test:
def __init__(self):
self.msg=""
def hello(self,bar):
print('hello {}'.format(bar))
self.msg="foo"
def main(self):
thread = threading.Thread(target=self.hello, args=('world!',))
thread.start()
thread.join()
print(self.msg)
g=test()
g.main()
解决方案 29:
最好的方法...定义一个全局变量,然后在线程函数中更改该变量。无需传入或检索
from threading import Thread
# global var
random_global_var = 5
def function():
global random_global_var
random_global_var += 1
domath = Thread(target=function)
domath.start()
domath.join()
print(random_global_var)
# result: 6
解决方案 30:
# python 3.x
from queue import Queue
from threading import Thread
import time
import numpy as np
def foo(bar):
time.sleep(np.random.randint(1,4,1).item())
# print('hello{0}'.format(bar))
return bar
que=Queue()
threads_list=list()
for i in range(0, 50):
threads_list.append(Thread(target= lambda q, arg1: q.put(foo(arg1)), args=(que, str(i))))
threads_list[-1].start()
for t in threads_list:
t.join()
results=[]
while not que.empty():
result = que.get()
results.append(result)
print(result)
print(results)
print(len(results))
enter code here
- 2024年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 项目管理必备:盘点2024年13款好用的项目管理软件
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)