在 Python 中创建线程

2025-01-20 09:07:00
admin
原创
112
摘要:问题描述:我有一个脚本,我希望一个函数与另一个函数同时运行。我看过的示例代码:import threading def MyThread (threading.thread): # doing something........ def MyThread2 (threading.thread): ...

问题描述:

我有一个脚本,我希望一个函数与另一个函数同时运行。

我看过的示例代码:

import threading

def MyThread (threading.thread):
    # doing something........

def MyThread2 (threading.thread):
    # doing something........

MyThread().start()
MyThread2().start()

我无法让它正常工作。我更愿意使用线程函数而不是类来实现它。

这是工作脚本:

from threading import Thread

class myClass():

    def help(self):
        os.system('./ssh.py')

    def nope(self):
        a = [1,2,3,4,5,6,67,78]
        for i in a:
            print i
            sleep(1)


if __name__ == "__main__":
    Yep = myClass()
    thread = Thread(target = Yep.help)
    thread2 = Thread(target = Yep.nope)
    thread.start()
    thread2.start()
    thread.join()
    print 'Finished'

解决方案 1:

您不需要使用子类Thread来实现这一点 - 请看我下面发布的简单示例以了解如何操作:

from threading import Thread
from time import sleep

def threaded_function(arg):
    for i in range(arg):
        print("running")
        sleep(1)


if __name__ == "__main__":
    thread = Thread(target = threaded_function, args = (10, ))
    thread.start()
    thread.join()
    print("thread finished...exiting")

这里我展示了如何使用线程模块创建一个线程,该线程将调用一个普通函数作为其目标。您可以看到我如何在线程构造函数中向它传递我需要的任何参数。

解决方案 2:

您的代码存在一些问题:

def MyThread ( threading.thread ):
  • 不能用函数作为子类,只能用类作为子类

  • 如果你要使用子类,你需要的是threading.Thread,而不是threading.thread

如果您确实只想使用函数来执行此操作,则有两个选择:

threading

import threading
def MyThread1():
    pass
def MyThread2():
    pass

t1 = threading.Thread(target=MyThread1, args=[])
t2 = threading.Thread(target=MyThread2, args=[])
t1.start()
t2.start()

_thread

import _thread
def MyThread1():
    pass
def MyThread2():
    pass

_thread.start_new_thread(MyThread1, ())
_thread.start_new_thread(MyThread2, ())

_thread.start_new_thread文档

解决方案 3:

我尝试添加另一个 join(),它似乎有效。以下是代码

from threading import Thread
from time import sleep

def function01(arg,name):
    for i in range(arg):
        print(name,'i---->',i,'
')
        print (name,"arg---->",arg,'
')
        sleep(1)

def test01():
    thread1 = Thread(target = function01, args = (10,'thread1', ))
    thread1.start()
    thread2 = Thread(target = function01, args = (10,'thread2', ))
    thread2.start()
    thread1.join()
    thread2.join()
    print ("thread finished...exiting")

test01()

解决方案 4:

使用实现多线程进程的简单方法threading

相同的代码片段

import threading

#function which takes some time to process 
def say(i):
    time.sleep(1)
    print(i)

threads = []
for i in range(10):
    
    thread = threading.Thread(target=say, args=(i,))

    thread.start()
    threads.append(thread)

#wait for all threads to complete before main program exits 
for thread in threads:
    thread.join()

解决方案 5:

Python 3 具有启动并行任务的功能。这使我们的工作更加轻松。

它具有线程池和进程池。

以下给出了一个见解:

ThreadPoolExecutor 示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

另一个例子

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

解决方案 6:

您可以使用构造函数target中的参数Thread直接传入被调用的函数,而不是run

解决方案 7:

您是否重写了 run() 方法?如果重写了__init__,您是否确保调用了基类threading.Thread.__init__()

启动两个线程后,主线程是否会继续无限期地在子线程上工作/阻止/加入,以便主线程执行不会在子线程完成其任务之前结束?

最后,您是否遇到任何未处理的异常?

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用