如何在 Python 中使用线程?

2024-11-20 08:44:00
admin
原创
6
摘要:问题描述:我想要一个清晰的例子来展示任务是如何分布在多个线程中的。解决方案 1:自从 2010 年提出这个问题以来,使用 Python 的map和pool进行简单的多线程处理已经有了真正的简化。下面的代码来自一篇文章/博客文章,你一定要看看(无关联) -一行中的并行性:日常线程任务的更好模型。我将在下面总结 ...

问题描述:

我想要一个清晰的例子来展示任务是如何分布在多个线程中的。


解决方案 1:

自从 2010 年提出这个问题以来,使用 Python 的mappool进行简单的多线程处理已经有了真正的简化。

下面的代码来自一篇文章/博客文章,你一定要看看(无关联) -一行中的并行性:日常线程任务的更好模型。我将在下面总结 - 它最终只是几行代码:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

这是多线程版本:

results = []
for item in my_array:
    results.append(my_function(item))

描述

Map 是一个很酷的小函数,也是轻松将并行性注入 Python 代码的关键。对于那些不熟悉的人来说,map 是从 Lisp 等函数式语言中借鉴而来的东西。它是一个将另一个函数映射到序列上的函数。

Map 为我们处理序列上的迭代,应用函数,并在最后将所有结果存储在一个方便的列表中。

在此处输入图片描述


执行

两个库提供了 map 函数的并行版本:multiprocessing,以及它的鲜为人知但同样出色的子步骤:multiprocessing.dummy。

multiprocessing.dummy与多处理模块完全相同,但是使用线程(一个重要的区别- 对于 CPU 密集型任务使用多个进程;对于 I/O(以及在 I/O 期间)使用线程):

multiprocessing.dummy 复制了多处理的 API,但只不过是线程模块的包装器。

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

计时结果如下:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

传递多个参数(仅在 Python 3.3 及更高版本中有效):

传递多个数组:

results = pool.starmap(function, zip(list_a, list_b))

或者传递一个常量和一个数组:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

如果您使用的是早期版本的 Python,则可以通过此解决方法传递多个参数)。

(感谢用户 136036 的有益评论。)

解决方案 2:

这是一个简单的例子:您需要尝试几个备选的 URL,并返回第一个响应的 URL 的内容。

import Queue
import threading
import urllib2

# Called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

这是使用线程进行简单优化的情况:每个子线程都在等待 URL 解析和响应,以将其内容放入队列;每个线程都是守护进程(如果主线程结束,则不会保持进程运行 - 这很常见);主线程启动所有子线程,对队列执行 ,get直到其中一个子线程执行put,然后发出结果并终止(这会关闭可能仍在运行的任何子线程,因为它们是守护线程)。

在 Python 中正确使用线程总是与 I/O 操作相关(因为 CPython 无论如何都不会使用多个核心来运行 CPU 密集型任务,所以使用线程的唯一原因是在等待某些 I/O 时不会阻塞进程)。顺便说一句,队列几乎总是将工作分配给线程和/或收集工作结果的最佳方式,而且它们本质上是线程安全的,因此它们可以让您免于担心锁、条件、事件、信号量和其他线程间协调/通信概念。

解决方案 3:

注意:对于 Python 中的实际并行化,您应该使用多处理模块来分叉并行执行的多个进程(由于全局解释器锁,Python 线程提供交错,但它们实际上是串行执行的,而不是并行执行的,并且仅在交错 I/O 操作时有用)。

但是,如果您只是在寻找交错(或者正在执行可以并行化的 I/O 操作,尽管存在全局解释器锁),那么线程模块就是开始的地方。作为一个非常简单的例子,让我们考虑通过并行求和子范围来求和大范围的问题:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

请注意,上面是一个非常愚蠢的例子,因为它完全不执行任何 I/O,并且由于全局解释器锁,在CPython中虽然会交错执行(增加了上下文切换的开销),但还是会串行执行。

解决方案 4:

与其他提到的一样,由于GIL , CPython 只能使用线程进行 I/O 等待。

如果你希望利用多核来执行 CPU 密集型任务,可以使用多处理:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

解决方案 5:

请注意:线程不需要队列。

这是我能想到的最简单的例子,它显示了 10 个进程同时运行。

import threading
from random import randint
from time import sleep


def print_number(number):

    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):

    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

解决方案 6:

Alex Martelli 的回答对我有帮助。不过,这里有一个修改过的版本,我认为它更有用(至少对我来说):

try:
    # For Python 3
    import queue
    from urllib.request import urlopen
except:
    # For Python 2 
    import Queue as queue
    from urllib2 import urlopen

import threading

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

# Load up a queue with your data. This will handle locking
q = queue.Queue()
for url in worker_data:
    q.put(url)

# Define a worker function
def worker(url_queue):
    queue_full = True
    while queue_full:
        try:
            # Get your data off the queue, and do some work
            url = url_queue.get(False)
            data = urlopen(url).read()
            print(len(data))

        except queue.Empty:
            queue_full = False

# Create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

解决方案 7:

给定一个函数,f将其线程化如下:

import threading
threading.Thread(target=f).start()

将参数传递给f

threading.Thread(target=f, args=(a,b,c)).start()

解决方案 8:

我发现这非常有用:创建与核心一样多的线程并让它们执行(大量)任务(在本例中,调用 shell 程序):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): # Put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        # Execute a task: call a shell program and wait until it completes
        subprocess.call("echo " + str(item), shell=True)
        q.task_done()

cpus = multiprocessing.cpu_count() # Detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() # Block until all tasks are done

解决方案 9:

Python 3 具有启动并行任务的功能。这使我们的工作更加轻松。

它具有线程池和进程池。

以下给出了一个见解:

ThreadPoolExecutor 示例(来源)

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor(来源)

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

解决方案 10:

我在这里看到了很多没有执行任何实际工作的示例,它们大多是 CPU 密集型的。这是一个 CPU 密集型任务的示例,它计算 1000 万到 1005 万之间的所有素数。我在这里使用了所有四种方法:

import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def time_stuff(fn):
    """
    Measure time of execution of a function
    """
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        fn(*args, **kwargs)
        t1 = timeit.default_timer()
        print("{} seconds".format(t1 - t0))
    return wrapper

def find_primes_in(nmin, nmax):
    """
    Compute a list of prime numbers between the given minimum and maximum arguments
    """
    primes = []

    # Loop from minimum to maximum
    for current in range(nmin, nmax + 1):

        # Take the square root of the current number
        sqrt_n = int(math.sqrt(current))
        found = False

        # Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
        for number in range(2, sqrt_n + 1):

            # If divisible we have found a factor, hence this is not a prime number, lets move to the next one
            if current % number == 0:
                found = True
                break

        # If not divisible, add this number to the list of primes that we have found so far
        if not found:
            primes.append(current)

    # I am merely printing the length of the array containing all the primes, but feel free to do what you want
    print(len(primes))

@time_stuff
def sequential_prime_finder(nmin, nmax):
    """
    Use the main process and main thread to compute everything in this case
    """
    find_primes_in(nmin, nmax)

@time_stuff
def threading_prime_finder(nmin, nmax):
    """
    If the minimum is 1000 and the maximum is 2000 and we have four workers,
    1000 - 1250 to worker 1
    1250 - 1500 to worker 2
    1500 - 1750 to worker 3
    1750 - 2000 to worker 4
    so let’s split the minimum and maximum values according to the number of workers
    """
    nrange = nmax - nmin
    threads = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)

        # Start the thread with the minimum and maximum split up to compute
        # Parallel computation will not work here due to the GIL since this is a CPU-bound task
        t = threading.Thread(target = find_primes_in, args = (start, end))
        threads.append(t)
        t.start()

    # Don’t forget to wait for the threads to finish
    for t in threads:
        t.join()

@time_stuff
def processing_prime_finder(nmin, nmax):
    """
    Split the minimum, maximum interval similar to the threading method above, but use processes this time
    """
    nrange = nmax - nmin
    processes = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)
        p = multiprocessing.Process(target = find_primes_in, args = (start, end))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

@time_stuff
def thread_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use a thread pool executor this time.
    This method is slightly faster than using pure threading as the pools manage threads more efficiently.
    This method is still slow due to the GIL limitations since we are doing a CPU-bound task.
    """
    nrange = nmax - nmin
    with ThreadPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

@time_stuff
def process_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use the process pool executor.
    This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations.
    RECOMMENDED METHOD FOR CPU-BOUND TASKS
    """
    nrange = nmax - nmin
    with ProcessPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

def main():
    nmin = int(1e7)
    nmax = int(1.05e7)
    print("Sequential Prime Finder Starting")
    sequential_prime_finder(nmin, nmax)
    print("Threading Prime Finder Starting")
    threading_prime_finder(nmin, nmax)
    print("Processing Prime Finder Starting")
    processing_prime_finder(nmin, nmax)
    print("Thread Executor Prime Finder Starting")
    thread_executor_prime_finder(nmin, nmax)
    print("Process Executor Finder Starting")
    process_executor_prime_finder(nmin, nmax)
if __name__ == "__main__":
    main()

以下是我的 Mac OS X 四核机器上的结果

Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds

解决方案 11:

使用全新的current.futures模块

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

对于曾经接触过 Java 的人来说,执行器方法可能看起来很熟悉。

另外附注:为了保持宇宙的理智,如果你不使用with上下文,不要忘记关闭你的池/执行器(它非常棒,可以为你完成这个操作)

解决方案 12:

对我来说,线程的完美示例是监视异步事件。看看这个代码。

# thread_test.py
import threading
import time

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

你可以通过打开IPython会话并执行以下操作来使用此代码:

>>> from thread_test import Monitor
>>> a = [0]
>>> mon = Monitor(a)
>>> mon.start()
>>> a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

等待几分钟

>>> a[0] = 2
Mon = 2

解决方案 13:

大多数文档和教程都使用 PythonThreadingQueue模块,这对于初学者来说可能有点难以理解。

也许可以考虑concurrent.futures.ThreadPoolExecutorPython 3 的模块。

with子句和列表理解相结合,它可能会产生真正的魅力。

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of URLs to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results
        rs = f.result()

解决方案 14:

通过借鉴这篇文章,我们了解了如何在多线程、多处理和异步之间进行选择asyncio以及它们的用法。

Python 3 有一个新的内置库,用于实现并发和并行——concurrent.futures

因此,我将通过一个实验来演示如何运行四个任务(即.sleep()方法)Threading-Pool

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker):
    futures = []
    tic = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))  # Two seconds sleep
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))
        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())
    print(f'Total elapsed time by {max_worker} workers:', time()-tic)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

输出:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

笔记:

  • 正如您在上面的结果中所看到的,最好的情况是 3 名工人完成这四项任务。

  • 如果您有一个流程任务而不是I/O绑定或阻塞(multiprocessing而不是threading),您可以将 更改ThreadPoolExecutorProcessPoolExecutor

解决方案 15:

我想提供一个简单的例子和​​我在自己解决这个问题时发现有用的解释。

在这个答案中,您将找到一些有关 Python 的GIL (全局解释器锁)的信息和一个使用 multiprocessing.dummy 编写的简单日常示例以及一些简单的基准。

全局解释器锁 (GIL)

Python 不支持真正意义上的多线程。它有一个多线程包,但是如果你想使用多线程来加快代码速度,那么使用它通常不是一个好主意。

Python 有一个称为全局解释器锁 (GIL) 的结构。GIL 确保一次只能执行一个“线程”。一个线程获取 GIL,执行一些工作,然后将 GIL 传递给下一个线程。

这发生得非常快,所以在人眼看来,你的线程似乎在并行执行,但实际上它们只是轮流使用同一个 CPU 核心。

所有这些 GIL 传递都会增加执行开销。这意味着,如果您想让代码运行得更快,那么使用线程包通常不是一个好主意。

使用 Python 的线程包是有原因的。如果你想同时运行一些任务,并且效率不是问题,那么它完全没问题,而且很方便。或者,如果你正在运行需要等待某些操作(例如某些 I/O)的代码,那么它可能很有意义。但是线程库不会让你使用额外的 CPU 核心。

多线程可以外包给操作系统(通过执行多处理),以及一些调用 Python 代码的外部应用程序(例如,Spark或Hadoop),或者 Python 代码调用的某些代码(例如:你可以让你的 Python 代码调用执行昂贵的多线程操作的 C 函数)。

为什么这很重要

因为很多人在了解 GIL 是什么之前就花了很多时间试图在他们花哨的 Python 多线程代码中寻找瓶颈。

一旦这些信息清楚了,下面就是我的代码:

#!/bin/python
from multiprocessing.dummy import Pool
from subprocess import PIPE,Popen
import time
import os

# In the variable pool_size we define the "parallelness".
# For CPU-bound tasks, it doesn't make sense to create more Pool processes
# than you have cores to run them on.
#
# On the other hand, if you are using I/O-bound tasks, it may make sense
# to create a quite a few more Pool processes than cores, since the processes
# will probably spend most their time blocked (waiting for I/O to complete).
pool_size = 8

def do_ping(ip):
    if os.name == 'nt':
        print ("Using Windows Ping to " + ip)
        proc = Popen(['ping', ip], stdout=PIPE)
        return proc.communicate()[0]
    else:
        print ("Using Linux / Unix Ping to " + ip)
        proc = Popen(['ping', ip, '-c', '4'], stdout=PIPE)
        return proc.communicate()[0]


os.system('cls' if os.name=='nt' else 'clear')
print ("Running using threads
")
start_time = time.time()
pool = Pool(pool_size)
website_names = ["www.google.com","www.facebook.com","www.pinterest.com","www.microsoft.com"]
result = {}
for website_name in website_names:
    result[website_name] = pool.apply_async(do_ping, args=(website_name,))
pool.close()
pool.join()
print ("
--- Execution took {} seconds ---".format((time.time() - start_time)))

# Now we do the same without threading, just to compare time
print ("
Running NOT using threads
")
start_time = time.time()
for website_name in website_names:
    do_ping(website_name)
print ("
--- Execution took {} seconds ---".format((time.time() - start_time)))

# Here's one way to print the final output from the threads
output = {}
for key, value in result.items():
    output[key] = value.get()
print ("
Output aggregated in a Dictionary:")
print (output)
print ("
")

print ("
Pretty printed output: ")
for key, value in output.items():
    print (key + "
")
    print (value)

解决方案 16:

这是使用线程导入CSV的一个非常简单的示例。(不同用途的库包含可能不同。)

辅助功能:

from threading import Thread
from project import app
import csv


def import_handler(csv_file_name):
    thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
    thr.start()

def dump_async_csv_data(csv_file_name):
    with app.app_context():
        with open(csv_file_name) as File:
            reader = csv.DictReader(File)
            for row in reader:
                # DB operation/query

驱动功能:

import_handler(csv_file_name)

解决方案 17:

这里有一个多线程的简单示例,会很有帮助。您可以运行它并轻松了解 Python 中的多线程工作原理。我使用了一个锁来阻止访问其他线程,直到之前的线程完成其工作。通过使用这行代码,

tLock = 线程.BoundedSemaphore(值=4)

您可以一次允许多个进程,并保留将在稍后或在完成先前的进程后运行的其余线程。

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "
Timer: ", name, " Started"
    tLock.acquire()
    print "
", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "
", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "
", name, " is releaseing the lock"
    tLock.release()
    print "
Timer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "
Main Complete"

if __name__ == "__main__":
    Main()

解决方案 18:

之前的解决方案都没有在我的 GNU/Linux 服务器(我没有管理员权限)上使用多核。它们只是在单核上运行。

我使用较低级别的os.fork接口来生成多个进程。以下是对我有用的代码:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break

解决方案 19:

作为 Alex Martelli 的答案的 python3 版本:

import queue as Queue
import threading
import urllib.request

# Called by each thread
def get_url(q, url):
    q.put(urllib.request.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com", "http://www.python.org","https://wiki.python.org/moin/"]

q = Queue.Queue()
def thread_func():
    for u in theurls:
        t = threading.Thread(target=get_url, args = (q,u))
        t.daemon = True
        t.start()

    s = q.get()
    
def non_thread_func():
    for u in theurls:
        get_url(q,u)
        

    s = q.get()
   

你可以测试一下:

start = time.time()
thread_func()
end = time.time()
print(end - start)

start = time.time()
non_thread_func()
end = time.time()
print(end - start)

non_thread_func() 所花费的时间应该是 thread_func() 的 4 倍

解决方案 20:

很容易理解,下面介绍两种简单的线程处理方法。

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

def a(a=1, b=2):
    print(a)
    time.sleep(5)
    print(b)
    return a+b

def b(**kwargs):
    if "a" in kwargs:
        print("am b")
    else:
        print("nothing")
        
to_do=[]
executor = ThreadPoolExecutor(max_workers=4)
ex1=executor.submit(a)
to_do.append(ex1)
ex2=executor.submit(b, **{"a":1})
to_do.append(ex2)

for future in as_completed(to_do):
    print("Future {} and Future Return is {}
".format(future, future.result()))

print("threading")

to_do=[]
to_do.append(threading.Thread(target=a))
to_do.append(threading.Thread(target=b, kwargs={"a":1}))

for threads in to_do:
    threads.start()
    
for threads in to_do:
    threads.join()

解决方案 21:

多线程示例。以下线程同时运行:

from threading import Thread

def fun_square(x):
    x_square = x**2
    print('x_square: ', x_square)

def x_pow_y(x,y):
    x_pow_y = x**y
    print('x_pow_y: ', x_pow_y)
    
def fun_qube(z):
    z_qube = z*z*z
    print('z_qube: ', z_qube)
    
def normal_fun():
    print("Normal fun is working at same time...")

Thread(target = fun_square, args=(5,)).start() #args=(x,)
Thread(target = x_pow_y, args=(2,4,)).start() #args=(x,y,)
Thread(target = fun_qube(4)).start() #fun_qube(z)
Thread(target = normal_fun).start()

解决方案 22:

import threading
import requests

def send():

  r = requests.get('https://www.stackoverlow.com')

thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()

解决方案 23:

下面的代码可以同时运行 10 个线程,0打印从到 的数字99

from threading import Thread

def test():
    for i in range(0, 100):
        print(i)

thread_list = []

for _ in range(0, 10):
    thread = Thread(target=test)
    thread_list.append(thread)

for thread in thread_list:
    thread.start()

for thread in thread_list:
    thread.join()

下面的代码是上面代码的简写for循环版本,运行10 个线程并发0打印从到 的数字99

from threading import Thread

def test():
    [print(i) for i in range(0, 100)]

thread_list = [Thread(target=test) for _ in range(0, 10)]

[thread.start() for thread in thread_list]

[thread.join() for thread in thread_list]

结果如下:

...
99
83
97
84
98
99
85
86
87
88
...

解决方案 24:

使用线程/多处理的最简单方法是使用更多高级库,例如autothread。

import autothread
from time import sleep as heavyworkload

@autothread.multithreaded() # <-- This is all you need to add
def example(x: int, y: int):
    heavyworkload(1)
    return x*y

现在,您可以为您的函数提供 int 列表。Autothread 将为您处理所有事情,并只为您提供并行计算的结果。

result = example([1, 2, 3, 4, 5], 10)

解决方案 25:

我只想运行最多 4 个线程来执行 DOS 命令列表。这就是操作方法。

import subprocess
import threading


print ("Pyton Start ...
")

# Create a semaphore to limit the number of concurrent threads
max_threads = 4
semaphore = threading.Semaphore(max_threads)

# Function to run a command
def run_command(command):
    with semaphore:  # Acquire semaphore
        print(f"Running command: {command}")
        try:
            # Execute the command
            result = subprocess.run(command, shell=True, check=True, text=True, capture_output=True)
            print("Output:
", result.stdout)
            if result.stderr:
                print("Errors:
", result.stderr)
        except subprocess.CalledProcessError as e:
            print(f"An error occurred while executing '{command}': {e}")



# List of DOS commands to run
commands = ["copy test.dmp test1.dmp", "copy test.dmp test2.dmp", "copy test.dmp test3.dmp"]
threads = []
    
for command in commands:
    thread = threading.Thread(target=run_command, args=(command,))
    threads.append(thread)
    thread.start()

# Wait for all threads to complete
for thread in threads:
    thread.join()


print ("
Pyton END ...
")
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   601  
  华为IPD与传统研发模式的8大差异在快速变化的商业环境中,产品研发模式的选择直接决定了企业的市场响应速度和竞争力。华为作为全球领先的通信技术解决方案供应商,其成功在很大程度上得益于对产品研发模式的持续创新。华为引入并深度定制的集成产品开发(IPD)体系,相较于传统的研发模式,展现出了显著的差异和优势。本文将详细探讨华为...
IPD流程是谁发明的   7  
  如何通过IPD流程缩短产品上市时间?在快速变化的市场环境中,产品上市时间成为企业竞争力的关键因素之一。集成产品开发(IPD, Integrated Product Development)作为一种先进的产品研发管理方法,通过其结构化的流程设计和跨部门协作机制,显著缩短了产品上市时间,提高了市场响应速度。本文将深入探讨如...
华为IPD流程   9  
  在项目管理领域,IPD(Integrated Product Development,集成产品开发)流程图是连接创意、设计与市场成功的桥梁。它不仅是一个视觉工具,更是一种战略思维方式的体现,帮助团队高效协同,确保产品按时、按质、按量推向市场。尽管IPD流程图可能初看之下显得错综复杂,但只需掌握几个关键点,你便能轻松驾驭...
IPD开发流程管理   8  
  在项目管理领域,集成产品开发(IPD)流程被视为提升产品上市速度、增强团队协作与创新能力的重要工具。然而,尽管IPD流程拥有诸多优势,其实施过程中仍可能遭遇多种挑战,导致项目失败。本文旨在深入探讨八个常见的IPD流程失败原因,并提出相应的解决方法,以帮助项目管理者规避风险,确保项目成功。缺乏明确的项目目标与战略对齐IP...
IPD流程图   8  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用