如何获取线程的返回值?

2024-11-27 10:43:00
admin
原创
141
摘要:问题描述:下面的函数foo返回一个字符串'foo'。如何获取'foo'从线程目标返回的值?from threading import Thread def foo(bar): print('hello {}'.format(bar)) return 'foo' thread = T...

问题描述:

下面的函数foo返回一个字符串'foo'。如何获取'foo'从线程目标返回的值?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'
    
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

上面显示的“一种明显的方法”不起作用:thread.join()返回None


解决方案 1:

我见过的一种方法是将可变对象(例如列表或字典)连同索引或其他某种标识符一起传递给线程的构造函数。然后线程可以将其结果存储在该对象的专用槽中。例如:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

如果您确实想join()返回被调用函数的返回值,您可以使用Thread如下所示的子类来执行此操作:

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

由于一些名称混乱,这变得有点复杂,并且它访问特定于Thread实现的“私有”数据结构......但它有效。

对于 Python 3:

class ThreadWithReturnValue(Thread):
    
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return

解决方案 2:

值得一提的是,multiprocessing模块使用类提供了一个很好的接口Pool。如果您想要坚持使用线程而不是进程,那么您可以使用类multiprocessing.pool.ThreadPool作为替代品。

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.

解决方案 3:

在 Python 3.2+ 中,stdlibconcurrent.futures模块为 提供了更高级别的 API threading,包括将工作线程的返回值或异常传递回主线程。您可以在实例上调用该result()方法Future,它将等到线程完成后才返回线程函数的结果值。

import concurrent.futures

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(foo, 'world!')
    return_value = future.result()
    print(return_value)

解决方案 4:

Jake 的回答很好,但如果你不想使用线程池(你不知道需要多少个线程,但根据需要创建它们),那么在线程之间传输信息的一个好方法是内置的Queue.Queue类,因为它提供了线程安全性。

我创建了以下装饰器,使其以类似于线程池的方式运行:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

然后你就可以用它作为:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

每次调用装饰函数时都会创建一个新线程,并返回一个包含将接收结果的队列的 Thread 对象。

更新

自从我发布这个答案已经有一段时间了,但是它仍然得到了浏览量,所以我想我会更新它以反映我在新版本的 Python 中执行此操作的方式:

Python 3.2 添加了模块concurrent.futures,它为并行任务提供了高级接口。它提供了ThreadPoolExecutorProcessPoolExecutor,因此您可以使用具有相同 API 的线程或进程池。

这个 API 的一个好处是,提交一个任务并Executor返回一个Future对象,该对象将使用您提交的可调用函数的返回值完成。

这使得附加queue对象变得没有必要,从而大大简化了装饰器:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

如果没有传入,则将使用默认模块线程池执行器。

用法和以前非常相似:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

如果你使用的是 Python 3.4+,那么使用此方法(以及一般的 Future 对象)的一个非常好的功能是,返回的 Future 可以被包装成asyncio.Futurewith asyncio.wrap_future。这使得它可以轻松地与协程配合使用:

result = await asyncio.wrap_future(long_task(10))

如果不需要访问底层concurrent.Future对象,则可以将包装包含在装饰器中:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

然后,每当你需要将 CPU 密集型或阻塞代码从事件循环线程中推出时,你可以将其放在一个装饰函数中:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()

解决方案 5:

另一种不需要更改现有代码的解决方案:

import Queue             # Python 2.x
#from queue import Queue # Python 3.x

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)     # Python 2.x
    #print('hello {0}'.format(bar))   # Python 3.x
    return 'foo'

que = Queue.Queue()      # Python 2.x
#que = Queue()           # Python 3.x

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result             # Python 2.x
#print(result)           # Python 3.x

它还可以轻松调整到多线程环境:

import Queue             # Python 2.x
#from queue import Queue # Python 3.x
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)     # Python 2.x
    #print('hello {0}'.format(bar))   # Python 3.x
    return 'foo'

que = Queue.Queue()      # Python 2.x
#que = Queue()           # Python 3.x

threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result         # Python 2.x
    #print(result)       # Python 3.x

解决方案 6:

更新:

我认为有一种更简单、更简洁的方法来保存线程的结果,并且使接口与类几乎相同threading.Thread(如果有特殊情况,请告诉我 - 我没有像下面的原始帖子那样进行过大量测试):

import threading

class ConciseResult(threading.Thread):
    def run(self):
        self.result = self._target(*self._args, **self._kwargs)

为了保持稳健并避免潜在的错误:

import threading

class ConciseRobustResult(threading.Thread):
    def run(self):
        try:
            if self._target is not None:
                self.result = self._target(*self._args, **self._kwargs)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs

简短解释:我们run重写的方法,threading.Thread而不修改任何其他内容。这使我们能够使用该类threading.Thread为我们执行的所有其他操作,而无需担心遗漏潜在的边缘情况,例如_private属性分配或自定义属性修改(如我的原始帖子中所述)。

通过查看和的run输出,我们可以验证我们只修改了方法。下面包含的唯一方法/属性/描述符是,其他所有内容都来自继承的基类(请参阅部分)。help(ConciseResult)`help(ConciseRobustResult)Methods defined here:runthreading.ThreadMethods inherited from threading.Thread:`

要使用下面的示例代码测试这些实现中的任何一个,请在下面的函数中替换ConciseResult或。ConciseRobustResult`ThreadWithResult`main

原始帖子在方法中使用闭包函数init

我发现大多数答案都很长,并且需要熟悉其他模块或高级 Python 特性,除非他们已经熟悉答案所讨论的所有内容,否则他们会感到困惑。

简化方法的工作代码:

import threading

class ThreadWithResult(threading.Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
        def function():
            self.result = target(*args, **kwargs)
        super().__init__(group=group, target=function, name=name, daemon=daemon)

示例代码:

import time, random


def function_to_thread(n):
    count = 0
    while count < 3:
            print(f'still running thread {n}')
            count +=1
            time.sleep(3)
    result = random.random()
    print(f'Return value of thread {n} should be: {result}')
    return result


def main():
    thread1 = ThreadWithResult(target=function_to_thread, args=(1,))
    thread2 = ThreadWithResult(target=function_to_thread, args=(2,))
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print(thread1.result)
    print(thread2.result)

main()

解释:
我想大大简化事情,所以我创建了一个ThreadWithResult类并让它从继承。中的threading.Thread嵌套函数调用我们想要保存其值的线程函数,并在线程执行完成后将该嵌套函数的结果保存为实例属性。function`__init__`self.result

创建此实例与创建实例相同threading.Thread。将要在新线程上运行的函数传递给target参数,将函数可能需要的任何参数传递给args参数,将任何关键字参数传递给kwargs参数。

例如

my_thread = ThreadWithResult(target=my_function, args=(arg1, arg2, arg3))

我认为这比绝大多数答案更容易理解,而且这种方法不需要额外的导入!我包含了timerandom模块来模拟线程的行为,但它们并不是实现原始问题中要求的功能所必需的。

我知道我在问题提出后很久才回答这个问题,但我希望这可以在将来帮助更多的人!


编辑:我创建了save-thread-resultPyPI 包,以便您可以访问上述相同的代码并在项目之间重复使用它(GitHub 代码在这里)。PyPI 包完全扩展了该类,因此您也可以设置要在类上threading.Thread设置的任何属性!threading.thread`ThreadWithResult`

上面的原始答案讨论了该子类背后的主要思想,但有关更多信息,请参见此处更详细的解释(来自模块文档字符串)。

快速使用示例:

pip3 install -U save-thread-result     # MacOS/Linux
pip  install -U save-thread-result     # Windows

python3     # MacOS/Linux
python      # Windows
from save_thread_result import ThreadWithResult

# As of Release 0.0.3, you can also specify values for
#`group`, `name`, and `daemon` if you want to set those
# values manually.
thread = ThreadWithResult(
    target = my_function,
    args   = (my_function_arg1, my_function_arg2, ...)
    kwargs = {my_function_kwarg1: kwarg1_value, my_function_kwarg2: kwarg2_value, ...}
)

thread.start()
thread.join()
if getattr(thread, 'result', None):
    print(thread.result)
else:
    # thread.result attribute not set - something caused
    # the thread to terminate BEFORE the thread finished
    # executing the function passed in through the
    # `target` argument
    print('ERROR! Something went wrong while executing this thread, and the function you passed in did NOT complete!!')

# seeing help about the class and information about the threading.Thread super class methods and attributes available:
help(ThreadWithResult)

解决方案 7:

Parris / kindall 的答案 join/return答案移植到 Python 3:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

请注意,Thread该类在 Python 3 中的实现有所不同。

解决方案 8:

我偷了 kindall 的答案并稍微清理了一下。

关键部分是将 args 和 *kwargs 添加到 join() 以处理超时

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)
        
        self._return = None
    
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
    
    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)
        
        return self._return

以下是更新后的答案

这是我获得最多赞同的答案,所以我决定用可以在 py2 和 py3 上运行的代码进行更新。

此外,我看到很多对这个问题的回答都表明对 Thread.join() 缺乏理解。有些答案完全无法处理timeout参数。但是,还有一个极端情况,您应该注意以下情况:(1) 目标函数可以返回None,(2) 您还将参数传递timeout给 join()。请参阅“测试 4”以了解这个极端情况。

与 py2 和 py3 一起使用的 ThreadWithReturn 类:

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479

_thread_target_key, _thread_args_key, _thread_kwargs_key = (
    ('_target', '_args', '_kwargs')
    if sys.version_info >= (3, 0) else
    ('_Thread__target', '_Thread__args', '_Thread__kwargs')
)

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None
    
    def run(self):
        target = getattr(self, _thread_target_key)
        if target is not None:
            self._return = target(
                *getattr(self, _thread_args_key),
                **getattr(self, _thread_kwargs_key)
            )
    
    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

一些示例测试如下所示:

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

您能指出我们在测试 4 中可能遇到的极端情况吗?

问题是我们期望 giveMe() 返回 None (参见测试 2),但我们也期望 join() 在超时时返回 None。

returned is None指:

(1)这就是 giveMe() 返回的结果,或者

(2)join()超时

这个例子很简单,因为我们知道 giveMe() 总是会返回 None。但在现实世界中(目标可能合法地返回 None 或其他内容),我们想要明确检查发生了什么。

下面说明了如何解决这个特殊情况:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()

解决方案 9:

使用队列:

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())

解决方案 10:

我发现最短、最简单的方法是利用 Python 类及其动态属性。您可以使用 从生成的线程上下文中检索当前线程threading.current_thread(),并将返回值分配给属性。

import threading

def some_target_function():
    # Your code here.
    threading.current_thread().return_value = "Some return value."

your_thread = threading.Thread(target=some_target_function)
your_thread.start()
your_thread.join()

return_value = your_thread.return_value
print(return_value)

解决方案 11:

我的解决方案是将函数和线程包装在一个类中。不需要使用池、队列或 C 类型变量传递。它也是非阻塞的。您可以检查状态。请参阅代码末尾如何使用它的示例。

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()

解决方案 12:

考虑到@iman@JakeBiesinger回答的评论,我将其重新组合为具有不同数量的线程:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

解决方案 13:

我正在使用这个包装器,它可以轻松地将任何函数转换为在Thread- 中运行,并处理其返回值或异常。它不会增加Queue开销。

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

使用示例

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

threading模块说明

线程函数的舒适返回值和异常处理是常见的“Pythonic”需求,模块确实应该已经提供threading- 可能直接在标准Thread类中提供。ThreadPool对于简单任务来说,开销太大 - 3 个管理线程,很多官僚作风。不幸的是,Thread布局最初是从 Java 复制而来 - 例如,您可以从仍然无用的第一个(!)构造函数参数中看到group

解决方案 14:

根据 kindall 所提到的内容,这是适用于 Python3 的更通用的解决方案。

import threading

class ThreadWithReturnValue(threading.Thread):
    def __init__(self, *init_args, **init_kwargs):
        threading.Thread.__init__(self, *init_args, **init_kwargs)
        self._return = None
    def run(self):
        self._return = self._target(*self._args, **self._kwargs)
    def join(self):
        threading.Thread.join(self)
        return self._return

用法

        th = ThreadWithReturnValue(target=requests.get, args=('http://www.google.com',))
        th.start()
        response = th.join()
        response.status_code  # => 200

解决方案 15:

join总是返回None,我认为你应该使用子类Thread来处理返回代码等等。

解决方案 16:

您可以在线程函数的范围之上定义一个可变变量,并将结果添加到其中。(我还修改了代码以兼容 python3)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

这将返回{'world!': 'foo'}

如果你使用函数输入作为结果字典的键,则每个唯一输入都保证在结果中给出一个条目

解决方案 17:

您可以使用pool.apply_async()ThreadPool()返回test()如下所示:

from multiprocessing.pool import ThreadPool

def test(num1, num2):
    return num1 + num2

pool = ThreadPool(processes=1) # Here
result = pool.apply_async(test, (2, 3)) # Here
print(result.get()) # 5

并且,您还可以使用submit()concurrent.futures.ThreadPoolExecutor()返回test()如下所示:

from concurrent.futures import ThreadPoolExecutor

def test(num1, num2):
    return num1 + num2

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(test, 2, 3) # Here
print(future.result()) # 5

而且,return您可以使用数组result来代替,如下所示:

from threading import Thread

def test(num1, num2, r):
    r[0] = num1 + num2 # Instead of "return"

result = [None] # Here

thread = Thread(target=test, args=(2, 3, result))
thread.start()
thread.join()
print(result[0]) # 5

另外return,您还可以使用队列result,如下所示:

from threading import Thread
import queue

def test(num1, num2, q):
    q.put(num1 + num2) # Instead of "return" 

queue = queue.Queue() # Here

thread = Thread(target=test, args=(2, 3, queue))
thread.start()
thread.join()
print(queue.get()) # '5'

解决方案 18:

这是一个相当老的问题,但我想分享一个对我有用并有助于我的开发过程的简单解决方案。

这个答案背后的方法论是,“新”目标函数通过所谓的闭包inner将原始函数的结果(通过该__init__函数传递)分配给result包装器的实例属性。

这使得包装器类可以保留返回值以供调用者随时访问。

threading.Thread注意:虽然尚未考虑yield函数(OP未提及yield函数),但此方法不需要使用任何混乱的方法或类的私有方法。

享受!

from threading import Thread as _Thread


class ThreadWrapper:
    def __init__(self, target, *args, **kwargs):
        self.result = None
        self._target = self._build_threaded_fn(target)
        self.thread = _Thread(
            target=self._target,
            *args,
            **kwargs
        )

    def _build_threaded_fn(self, func):
        def inner(*args, **kwargs):
            self.result = func(*args, **kwargs)
        return inner

此外,您可以运行pytest以下代码(假设您已安装它)来演示结果:

import time
from commons import ThreadWrapper


def test():

    def target():
        time.sleep(1)
        return 'Hello'

    wrapper = ThreadWrapper(target=target)
    wrapper.thread.start()

    r = wrapper.result
    assert r is None

    time.sleep(2)

    r = wrapper.result
    assert r == 'Hello'

解决方案 19:

定义你的目标:

1)提出论点2)用以下方式q

替换任何陈述return foo`q.put(foo); return`

所以一个函数

def func(a):
    ans = a * a
    return ans

将成为

def func(a, q):
    ans = a * a
    q.put(ans)
    return

然后你就可以继续这样做

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

您可以使用函数装饰器/包装器来实现这一点,这样您就可以使用现有的函数而不target修改它们,但要遵循这个基本方案。

解决方案 20:

GuySoft 的想法很棒,但我认为对象不一定必须从 Thread 继承,并且可以从接口中删除 start():

from threading import Thread
import queue
class ThreadWithReturnValue(object):
    def __init__(self, target=None, args=(), **kwargs):
        self._que = queue.Queue()
        self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
                args=(self._que, args, kwargs), )
        self._t.start()

    def join(self):
        self._t.join()
        return self._que.get()


def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

print(twrv.join())   # prints foo

解决方案 21:

如上所述,多处理池比基本线程慢得多。使用队列(如这里的一些答案中所建议的)是一种非常有效的替代方法。我已将其与字典一起使用,以便能够运行大量小线程并通过将它们与字典组合来恢复多个答案:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)

解决方案 22:

这是我根据@Kindall 的答案创建的版本。

此版本使得您只需输入带有参数的命令即可创建新线程。

这是用 Python 3.8 制作的:

from threading import Thread
from typing import Any

def test(plug, plug2, plug3):
    print(f"hello {plug}")
    print(f'I am the second plug : {plug2}')
    print(plug3)
    return 'I am the return Value!'

def test2(msg):
    return f'I am from the second test: {msg}'

def test3():
    print('hello world')

def NewThread(com, Returning: bool, *arguments) -> Any:
    """
    Will create a new thread for a function/command.

    :param com: Command to be Executed
    :param arguments: Arguments to be sent to Command
    :param Returning: True/False Will this command need to return anything
    """
    class NewThreadWorker(Thread):
        def __init__(self, group = None, target = None, name = None, args = (), kwargs = None, *,
                     daemon = None):
            Thread.__init__(self, group, target, name, args, kwargs, daemon = daemon)
            
            self._return = None
        
        def run(self):
            if self._target is not None:
                self._return = self._target(*self._args, **self._kwargs)
        
        def join(self):
            Thread.join(self)
            return self._return
    
    ntw = NewThreadWorker(target = com, args = (*arguments,))
    ntw.start()
    if Returning:
        return ntw.join()

if __name__ == "__main__":
    print(NewThread(test, True, 'hi', 'test', test2('hi')))
    NewThread(test3, True)

解决方案 23:

感谢@alec-cureau。下面的代码对我来说运行良好。替换了弃用的 currentThread():

import threading
import time

def helo(name):
    print("hello", name)
    time.sleep(5)
    threading.current_thread().return_value = name
    return True

t1 = threading.Thread(target=helo, args=('bhargav',))
t2 = threading.Thread(target=helo, args=('kushal',))

t1.start()
t2.start()

print(t1.join())
print(t2.join())

print( "returned", t1.return_value)
print("returned", t2.return_value)

解决方案 24:

一个常见的解决方案是foo使用装饰器包装你的函数

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

那么整个代码可能看起来像这样

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

笔记

一个重要的问题是返回值可能是无序的。(事实上,return value不一定保存到queue,因为您可以选择任意线程安全的数据结构)

解决方案 25:

Kindall在 Python3 中的回答

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon)
        self._return = None 

    def run(self):
        try:
            if self._target:
                self._return = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs 

    def join(self,timeout=None):
        Thread.join(self,timeout)
        return self._return

解决方案 26:

我不知道这对你们是否有效,但我选择创建一个全局对象 [主要是字典或嵌套数组],这样函数就可以访问该对象并对其进行变异,我知道这需要更多资源,但我们不是在处理量子科学,所以,我想我们可以提供更多的内存,前提是内存消耗随 CPU 使用率线性增加。这是一个示例代码:

import requests 
import json 
import string 
import random 
import threading
import time 
dictionary = {} 
def get_val1(L): 
    print('#1')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val2(L): 
    print('#2')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val3(L): 
    print('#3')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val4(L): 
    print('#4')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
t1 = threading.Thread(target=get_val1,args=(L[0],)) 
t2 = threading.Thread(target=get_val2,args=(L[1],)) 
t3 = threading.Thread(target=get_val3,args=(L[2],))
t4 = threading.Thread(target=get_val4,args=(L[3],))

t1.start()
t2.start()
t3.start()
t4.start()

t1.join()
t2.join()
t3.join()
t4.join()

这个程序运行4个线程,每个线程返回一些文本L[i] for i in L的数据,API返回的数据存储在字典中,它是否有用可能因程序而异,对于小型到中型的计算任务,这个对象变异工作得非常快,并使用更多资源的百分之几。

解决方案 27:

忘记所有复杂的解决方案。

只需安装合适的软件包,例如:
https: //pypi.org/project/thread-with-results/

解决方案 28:

我知道这个帖子很老了......但我遇到了同样的问题......如果你愿意使用thread.join()

import threading

class test:

    def __init__(self):
        self.msg=""

    def hello(self,bar):
        print('hello {}'.format(bar))
        self.msg="foo"


    def main(self):
        thread = threading.Thread(target=self.hello, args=('world!',))
        thread.start()
        thread.join()
        print(self.msg)

g=test()
g.main()

解决方案 29:

最好的方法...定义一个全局变量,然后在线程函数中更改该变量。无需传入或检索

from threading import Thread

# global var
random_global_var = 5

def function():
    global random_global_var
    random_global_var += 1

domath = Thread(target=function)
domath.start()
domath.join()
print(random_global_var)

# result: 6

解决方案 30:

# python 3.x
from queue import Queue
from threading import Thread
import time
import numpy as np

def foo(bar):
 
    time.sleep(np.random.randint(1,4,1).item())
    # print('hello{0}'.format(bar))
    return bar

que=Queue()

threads_list=list()

for i in range(0, 50):
    threads_list.append(Thread(target= lambda q, arg1: q.put(foo(arg1)), args=(que, str(i))))
    threads_list[-1].start()

for t in threads_list:
    t.join()

results=[]
while not que.empty():
    result = que.get()
    results.append(result)
    print(result)

print(results)
print(len(results))

enter code here
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1267  
  IPD(Integrated Product Development)即集成产品开发,是一套先进的、成熟的产品开发管理理念、模式和方法。随着市场竞争的日益激烈,企业对于提升产品开发效率、降低成本、提高产品质量的需求愈发迫切,IPD 项目管理咨询市场也迎来了广阔的发展空间。深入探讨 IPD 项目管理咨询的市场需求与发展,...
IPD集成产品开发流程   27  
  IPD(Integrated Product Development)产品开发流程是一套先进的、被广泛应用的产品开发管理体系,它涵盖了从产品概念产生到产品推向市场并持续优化的全过程。通过将市场、研发、生产、销售等多个环节紧密整合,IPD旨在提高产品开发的效率、质量,降低成本,增强企业的市场竞争力。深入了解IPD产品开发...
IPD流程中TR   31  
  IPD(Integrated Product Development)测试流程是确保产品质量、提升研发效率的关键环节。它贯穿于产品从概念到上市的整个生命周期,对企业的成功至关重要。深入理解IPD测试流程的核心要点,有助于企业优化研发过程,打造更具竞争力的产品。以下将详细阐述IPD测试流程的三大核心要点。测试策略规划测试...
华为IPD   26  
  华为作为全球知名的科技企业,其成功背后的管理体系备受关注。IPD(集成产品开发)流程作为华为核心的产品开发管理模式,在创新管理与技术突破方面发挥了至关重要的作用。深入剖析华为 IPD 流程中的创新管理与技术突破,对于众多企业探索自身发展路径具有重要的借鉴意义。IPD 流程概述IPD 流程是一种先进的产品开发管理理念和方...
TR评审   26  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用