PyQt 中使用 QThread 的后台线程

2024-11-29 08:42:00
admin
原创
132
摘要:问题描述:我有一个程序,它通过我在 PyQt 中编写的 GUI 与我正在使用的无线电交互。显然,无线电的主要功能之一是传输数据,但为了连续执行此操作,我必须循环写入,这会导致 GUI 挂起。由于我从未处理过线程,我尝试使用摆脱这些挂起,QCoreApplication.processEvents().但是无线...

问题描述:

我有一个程序,它通过我在 PyQt 中编写的 GUI 与我正在使用的无线电交互。显然,无线电的主要功能之一是传输数据,但为了连续执行此操作,我必须循环写入,这会导致 GUI 挂起。由于我从未处理过线程,我尝试使用摆脱这些挂起,QCoreApplication.processEvents().但是无线电需要在传输之间休眠,因此 GUI 仍然会根据这些休眠持续时间挂起。

有没有一种简单的方法可以使用 QThread 来解决这个问题?我查找过有关如何使用 PyQt 实现多线程的教程,但大多数教程都涉及设置服务器,而且比我需要的要高级得多。老实说,我甚至不需要我的线程在运行时更新任何东西,我只需要启动它,让它在后台传输,然后停止它。


解决方案 1:

我创建了一个小示例,展示了处理线程的 3 种不同且简单的方法。我希望它能帮助您找到解决问题的正确方法。

import sys
import time

from PyQt5.QtCore import (QCoreApplication, QObject, QRunnable, QThread,
                          QThreadPool, pyqtSignal)


# Subclassing QThread
# http://qt-project.org/doc/latest/qthread.html
class AThread(QThread):

    def run(self):
        count = 0
        while count < 5:
            time.sleep(1)
            print("A Increasing")
            count += 1

# Subclassing QObject and using moveToThread
# http://blog.qt.digia.com/blog/2007/07/05/qthreads-no-longer-abstract
class SomeObject(QObject):

    finished = pyqtSignal()

    def long_running(self):
        count = 0
        while count < 5:
            time.sleep(1)
            print("B Increasing")
            count += 1
        self.finished.emit()

# Using a QRunnable
# http://qt-project.org/doc/latest/qthreadpool.html
# Note that a QRunnable isn't a subclass of QObject and therefore does
# not provide signals and slots.
class Runnable(QRunnable):

    def run(self):
        count = 0
        app = QCoreApplication.instance()
        while count < 5:
            print("C Increasing")
            time.sleep(1)
            count += 1
        app.quit()


def using_q_thread():
    app = QCoreApplication([])
    thread = AThread()
    thread.finished.connect(app.exit)
    thread.start()
    sys.exit(app.exec_())

def using_move_to_thread():
    app = QCoreApplication([])
    objThread = QThread()
    obj = SomeObject()
    obj.moveToThread(objThread)
    obj.finished.connect(objThread.quit)
    objThread.started.connect(obj.long_running)
    objThread.finished.connect(app.exit)
    objThread.start()
    sys.exit(app.exec_())

def using_q_runnable():
    app = QCoreApplication([])
    runnable = Runnable()
    QThreadPool.globalInstance().start(runnable)
    sys.exit(app.exec_())

if __name__ == "__main__":
    #using_q_thread()
    #using_move_to_thread()
    using_q_runnable()

解决方案 2:

以 PyQt5、python 3.4 更新的答案为例

使用此作为模式来启动一个不获取数据并在数据可用于表单时返回数据的工作者。

1-Worker类变得更小,并放入自己的文件worker.py中,以便于记忆和独立的软件重用。

2 - main.py 文件是定义 GUI 表单类的文件

3-线程对象没有子类化。

4 - 线程对象和工作对象都属于 Form 对象

5-程序步骤在注释中。

# worker.py
from PyQt5.QtCore import QThread, QObject, pyqtSignal, pyqtSlot
import time


class Worker(QObject):
    finished = pyqtSignal()
    intReady = pyqtSignal(int)

    @pyqtSlot()
    def procCounter(self): # A slot takes no params
        for i in range(1, 100):
            time.sleep(1)
            self.intReady.emit(i)

        self.finished.emit()

主要文件是:

# main.py
from PyQt5.QtCore import QThread
from PyQt5.QtWidgets import QApplication, QLabel, QWidget, QGridLayout
import sys
import worker
    
    
class Form(QWidget):
 
    def __init__(self):
       super().__init__()
       self.label = QLabel("0")

       # 1 - create Worker and Thread inside the Form
       self.obj = worker.Worker()  # no parent!
       self.thread = QThread()  # no parent!

       # 2 - Connect Worker`s Signals to Form method slots to post data.
       self.obj.intReady.connect(self.onIntReady)

       # 3 - Move the Worker object to the Thread object
       self.obj.moveToThread(self.thread)

       # 4 - Connect Worker Signals to the Thread slots
       self.obj.finished.connect(self.thread.quit)

       # 5 - Connect Thread started signal to Worker operational slot method
       self.thread.started.connect(self.obj.procCounter)

       # * - Thread finished signal will close the app if you want!
       #self.thread.finished.connect(app.exit)

       # 6 - Start the thread
       self.thread.start()

       # 7 - Start the form
       self.initUI()
  
    def initUI(self):
        grid = QGridLayout()
        self.setLayout(grid)
        grid.addWidget(self.label,0,0)

        self.move(300, 150)
        self.setWindowTitle('thread test')
        self.show()

    def onIntReady(self, i):
        self.label.setText("{}".format(i))
        #print(i)

app = QApplication(sys.argv)
  
form = Form()
    
sys.exit(app.exec_())

解决方案 3:

根据 Qt 开发人员的说法,子类化 QThread 是不正确的(请参阅http://blog.qt.io/blog/2010/06/17/youre-doing-it-wrong/)。但那篇文章真的很难理解(而且标题有点居高临下)。我发现了一篇更好的博客文章,它更详细地解释了为什么你应该使用一种线程样式而不是另一种:http://mayaposch.wordpress.com/2011/11/01/how-to-really-truly-use-qthreads-the-full-explanation/

此外,我强烈推荐KDAB 的有关线程间信号和槽的这个视频。

我认为,您可能永远不应该为了重载 run 方法而对线程进行子类化。虽然这样做确实可行,但您基本上是在绕过 Qt 希望您如何工作。此外,您会错过诸如事件和适当的线程安全信号和插槽之类的东西。此外,正如您可能在上面的博客文章中看到的那样,“正确的”线程方式迫使您编写更易于测试的代码。

这里有几个如何在 PyQt 中利用 QThreads 的例子(我在下面发布了一个单独的答案,正确使用了 QRunnable 并结合了信号/槽,如果您有很多需要负载平衡的异步任务,这个答案会更好)。

import sys
from PyQt4 import QtCore
from PyQt4 import QtGui
from PyQt4.QtCore import Qt

# very testable class (hint: you can use mock.Mock for the signals)
class Worker(QtCore.QObject):
    finished = QtCore.pyqtSignal()
    dataReady = QtCore.pyqtSignal(list, dict)

    @QtCore.pyqtSlot()
    def processA(self):
        print "Worker.processA()"
        self.finished.emit()
    
    @QtCore.pyqtSlot(str, list, list)
    def processB(self, foo, bar=None, baz=None):
        print "Worker.processB()"
        for thing in bar:
            # lots of processing...
            self.dataReady.emit(['dummy', 'data'], {'dummy': ['data']})
        self.finished.emit()


class Thread(QtCore.QThread):
    """Need for PyQt4 <= 4.6 only"""
    def __init__(self, parent=None):
        QtCore.QThread.__init__(self, parent)
    
     # this class is solely needed for these two methods, there
     # appears to be a bug in PyQt 4.6 that requires you to
     # explicitly call run and start from the subclass in order
     # to get the thread to actually start an event loop

    def start(self):
        QtCore.QThread.start(self)

    def run(self):
        QtCore.QThread.run(self)


app = QtGui.QApplication(sys.argv)

thread = Thread() # no parent!
obj = Worker() # no parent!
obj.moveToThread(thread)

# if you want the thread to stop after the worker is done
# you can always call thread.start() again later
obj.finished.connect(thread.quit)

# one way to do it is to start processing as soon as the thread starts
# this is okay in some cases... but makes it harder to send data to
# the worker object from the main gui thread.  As you can see I'm calling
# processA() which takes no arguments
thread.started.connect(obj.processA)
thread.start()

# another way to do it, which is a bit fancier, allows you to talk back and
# forth with the object in a thread safe way by communicating through signals
# and slots (now that the thread is running I can start calling methods on
# the worker object)
QtCore.QMetaObject.invokeMethod(obj, 'processB', Qt.QueuedConnection,
                                QtCore.Q_ARG(str, "Hello World!"),
                                QtCore.Q_ARG(list, ["args", 0, 1]),
                                QtCore.Q_ARG(list, []))

# that looks a bit scary, but its a totally ok thing to do in Qt,
# we're simply using the system that Signals and Slots are built on top of,
# the QMetaObject, to make it act like we safely emitted a signal for 
# the worker thread to pick up when its event loop resumes (so if its doing
# a bunch of work you can call this method 10 times and it will just queue
# up the calls.  Note: PyQt > 4.6 will not allow you to pass in a None
# instead of an empty list, it has stricter type checking

app.exec_()

# Without this you may get weird QThread messages in the shell on exit
app.deleteLater()        

解决方案 4:

Matt 的例子非常好,我修正了拼写错误,而且 pyqt4.8 现在很常见,所以我删除了虚拟类,并添加了一个 dataReady 信号的示例

# -*- coding: utf-8 -*-
import sys
from PyQt4 import QtCore, QtGui
from PyQt4.QtCore import Qt


# very testable class (hint: you can use mock.Mock for the signals)
class Worker(QtCore.QObject):
    finished = QtCore.pyqtSignal()
    dataReady = QtCore.pyqtSignal(list, dict)

    @QtCore.pyqtSlot()
    def processA(self):
        print "Worker.processA()"
        self.finished.emit()

    @QtCore.pyqtSlot(str, list, list)
    def processB(self, foo, bar=None, baz=None):
        print "Worker.processB()"
        for thing in bar:
            # lots of processing...
            self.dataReady.emit(['dummy', 'data'], {'dummy': ['data']})
        self.finished.emit()


def onDataReady(aList, aDict):
    print 'onDataReady'
    print repr(aList)
    print repr(aDict)


app = QtGui.QApplication(sys.argv)

thread = QtCore.QThread()  # no parent!
obj = Worker()  # no parent!
obj.dataReady.connect(onDataReady)

obj.moveToThread(thread)

# if you want the thread to stop after the worker is done
# you can always call thread.start() again later
obj.finished.connect(thread.quit)

# one way to do it is to start processing as soon as the thread starts
# this is okay in some cases... but makes it harder to send data to
# the worker object from the main gui thread.  As you can see I'm calling
# processA() which takes no arguments
thread.started.connect(obj.processA)
thread.finished.connect(app.exit)

thread.start()

# another way to do it, which is a bit fancier, allows you to talk back and
# forth with the object in a thread safe way by communicating through signals
# and slots (now that the thread is running I can start calling methods on
# the worker object)
QtCore.QMetaObject.invokeMethod(obj, 'processB', Qt.QueuedConnection,
                                QtCore.Q_ARG(str, "Hello World!"),
                                QtCore.Q_ARG(list, ["args", 0, 1]),
                                QtCore.Q_ARG(list, []))

# that looks a bit scary, but its a totally ok thing to do in Qt,
# we're simply using the system that Signals and Slots are built on top of,
# the QMetaObject, to make it act like we safely emitted a signal for
# the worker thread to pick up when its event loop resumes (so if its doing
# a bunch of work you can call this method 10 times and it will just queue
# up the calls.  Note: PyQt > 4.6 will not allow you to pass in a None
# instead of an empty list, it has stricter type checking

app.exec_()

解决方案 5:

在 PyQt 中,有很多选项可用于获取异步行为。对于需要事件处理的东西(即 QtNetwork 等),您应该使用我在此线程上的其他答案中提供的 QThread 示例。但对于绝大多数线程需求,我认为此解决方案远优于其他方法。

这样做的好处是 QThreadPool 会将您的 QRunnable 实例作为任务进行调度。这类似于英特尔 TBB 中使用的任务模式。它并不像我喜欢的那样优雅,但它确实实现了出色的异步行为。

这样,您就可以通过 QRunnable 在 Python 中充分利用 Qt 的大部分线程功能,同时还能利用信号和槽。我在多个应用程序中使用了相同的代码,有些应用程序会进行数百次异步 REST 调用,有些应用程序会打开文件或列出目录,最好的部分是使用此方法,Qt 任务会为我平衡系统资源。

import time
from PyQt4 import QtCore
from PyQt4 import QtGui
from PyQt4.QtCore import Qt


def async(method, args, uid, readycb, errorcb=None):
    """
    Asynchronously runs a task

    :param func method: the method to run in a thread
    :param object uid: a unique identifier for this task (used for verification)
    :param slot updatecb: the callback when data is receieved cb(uid, data)
    :param slot errorcb: the callback when there is an error cb(uid, errmsg)

    The uid option is useful when the calling code makes multiple async calls
    and the callbacks need some context about what was sent to the async method.
    For example, if you use this method to thread a long running database call
    and the user decides they want to cancel it and start a different one, the
    first one may complete before you have a chance to cancel the task.  In that
    case, the "readycb" will be called with the cancelled task's data.  The uid
    can be used to differentiate those two calls (ie. using the sql query).

    :returns: Request instance
    """
    request = Request(method, args, uid, readycb, errorcb)
    QtCore.QThreadPool.globalInstance().start(request)
    return request


class Request(QtCore.QRunnable):
    """
    A Qt object that represents an asynchronous task

    :param func method: the method to call
    :param list args: list of arguments to pass to method
    :param object uid: a unique identifier (used for verification)
    :param slot readycb: the callback used when data is receieved
    :param slot errorcb: the callback used when there is an error

    The uid param is sent to your error and update callbacks as the
    first argument. It's there to verify the data you're returning

    After created it should be used by invoking:

    .. code-block:: python

       task = Request(...)
       QtCore.QThreadPool.globalInstance().start(task)

    """
    INSTANCES = []
    FINISHED = []
    def __init__(self, method, args, uid, readycb, errorcb=None):
        super(Request, self).__init__()
        self.setAutoDelete(True)
        self.cancelled = False

        self.method = method
        self.args = args
        self.uid = uid
        self.dataReady = readycb
        self.dataError = errorcb

        Request.INSTANCES.append(self)

        # release all of the finished tasks
        Request.FINISHED = []

    def run(self):
        """
        Method automatically called by Qt when the runnable is ready to run.
        This will run in a separate thread.
        """
        # this allows us to "cancel" queued tasks if needed, should be done
        # on shutdown to prevent the app from hanging
        if self.cancelled:
            self.cleanup()
            return

        # runs in a separate thread, for proper async signal/slot behavior
        # the object that emits the signals must be created in this thread.
        # Its not possible to run grabber.moveToThread(QThread.currentThread())
        # so to get this QObject to properly exhibit asynchronous
        # signal and slot behavior it needs to live in the thread that
        # we're running in, creating the object from within this thread
        # is an easy way to do that.
        grabber = Requester()
        grabber.Loaded.connect(self.dataReady, Qt.QueuedConnection)
        if self.dataError is not None:
            grabber.Error.connect(self.dataError, Qt.QueuedConnection)

        try:
            result = self.method(*self.args)
            if self.cancelled:
                # cleanup happens in 'finally' statement
                return
            grabber.Loaded.emit(self.uid, result)
        except Exception as error:
            if self.cancelled:
                # cleanup happens in 'finally' statement
                return
            grabber.Error.emit(self.uid, unicode(error))
        finally:
            # this will run even if one of the above return statements
            # is executed inside of the try/except statement see:
            # https://docs.python.org/2.7/tutorial/errors.html#defining-clean-up-actions
            self.cleanup(grabber)

    def cleanup(self, grabber=None):
        # remove references to any object or method for proper ref counting
        self.method = None
        self.args = None
        self.uid = None
        self.dataReady = None
        self.dataError = None

        if grabber is not None:
            grabber.deleteLater()

        # make sure this python obj gets cleaned up
        self.remove()

    def remove(self):
        try:
            Request.INSTANCES.remove(self)

            # when the next request is created, it will clean this one up
            # this will help us avoid this object being cleaned up
            # when it's still being used
            Request.FINISHED.append(self)
        except ValueError:
            # there might be a race condition on shutdown, when shutdown()
            # is called while the thread is still running and the instance
            # has already been removed from the list
            return

    @staticmethod
    def shutdown():
        for inst in Request.INSTANCES:
            inst.cancelled = True
        Request.INSTANCES = []
        Request.FINISHED = []


class Requester(QtCore.QObject):
    """
    A simple object designed to be used in a separate thread to allow
    for asynchronous data fetching
    """

    #
    # Signals
    #

    Error = QtCore.pyqtSignal(object, unicode)
    """
    Emitted if the fetch fails for any reason

    :param unicode uid: an id to identify this request
    :param unicode error: the error message
    """

    Loaded = QtCore.pyqtSignal(object, object)
    """
    Emitted whenever data comes back successfully

    :param unicode uid: an id to identify this request
    :param list data: the json list returned from the GET
    """

    NetworkConnectionError = QtCore.pyqtSignal(unicode)
    """
    Emitted when the task fails due to a network connection error

    :param unicode message: network connection error message
    """

    def __init__(self, parent=None):
        super(Requester, self).__init__(parent)


class ExampleObject(QtCore.QObject):
    def __init__(self, parent=None):
        super(ExampleObject, self).__init__(parent)
        self.uid = 0
        self.request = None

    def ready_callback(self, uid, result):
        if uid != self.uid:
            return
        print "Data ready from %s: %s" % (uid, result)

    def error_callback(self, uid, error):
        if uid != self.uid:
            return
        print "Data error from %s: %s" % (uid, error)

    def fetch(self):
        if self.request is not None:
            # cancel any pending requests
            self.request.cancelled = True
            self.request = None

        self.uid += 1
        self.request = async(slow_method, ["arg1", "arg2"], self.uid,
                             self.ready_callback,
                             self.error_callback)


def slow_method(arg1, arg2):
    print "Starting slow method"
    time.sleep(1)
    return arg1 + arg2


if __name__ == "__main__":
    import sys
    app = QtGui.QApplication(sys.argv)

    obj = ExampleObject()

    dialog = QtGui.QDialog()
    layout = QtGui.QVBoxLayout(dialog)
    button = QtGui.QPushButton("Generate", dialog)
    progress = QtGui.QProgressBar(dialog)
    progress.setRange(0, 0)
    layout.addWidget(button)
    layout.addWidget(progress)
    button.clicked.connect(obj.fetch)
    dialog.show()

    app.exec_()
    app.deleteLater() # avoids some QThread messages in the shell on exit
    # cancel all running tasks avoid QThread/QTimer error messages
    # on exit
    Request.shutdown()

退出应用程序时,您需要确保取消所有任务,否则应用程序将挂起,直到每个计划的任务都完成

解决方案 6:

根据其他答案中提到的 Worker 对象方法,我决定看看是否可以扩展解决方案以调用更多线程 - 在这种情况下,机器可以运行的最佳数量,并启动具有不确定完成时间的多个工作器。为此,我仍然需要子类化 QThread - 但只需分配一个线程号并“重新实现”信号“完成”和“启动”以包含它们的线程号。

我非常关注主 GUI、线程和工作程序之间的信号。

同样,其他答案也费尽心思指出不将 QThread 作为父级,但我不认为这是一个真正的问题。但是,我的代码也小心地销毁了 QThread 对象。

但是,我无法为工作对象指定父级,因此似乎最好向它们发送 deleteLater() 信号,无论是在线程函数完成时还是在 GUI 被销毁时。我自己的代码就因为没有这样做而挂起。

我认为另一个必要的改进是重新实现 GUI (QWidget) 的 closeEvent,这样线程将被指示退出,然后 GUI 将等待直到所有线程完成。当我玩这个问题的其他一些答案时,我得到了 QThread 被破坏的错误。

也许这对其他人有用。我确实发现这是一个有用的练习。也许其他人会知道线程宣布其身份的更好方法。

#!/usr/bin/env python3
#coding:utf-8
# Author:   --<>
# Purpose:  To demonstrate creation of multiple threads and identify the receipt of thread results
# Created: 19/12/15

import sys


from PyQt4.QtCore import QThread, pyqtSlot, pyqtSignal
from PyQt4.QtGui import QApplication, QLabel, QWidget, QGridLayout

import sys
import worker

class Thread(QThread):
    #make new signals to be able to return an id for the thread
    startedx = pyqtSignal(int)
    finishedx = pyqtSignal(int)

    def __init__(self,i,parent=None):
        super().__init__(parent)
        self.idd = i

        self.started.connect(self.starttt)
        self.finished.connect(self.finisheddd)

    @pyqtSlot()
    def starttt(self):
        print('started signal from thread emitted')
        self.startedx.emit(self.idd) 

    @pyqtSlot()
    def finisheddd(self):
        print('finished signal from thread emitted')
        self.finishedx.emit(self.idd)

class Form(QWidget):

    def __init__(self):
        super().__init__()

        self.initUI()

        self.worker={}
        self.threadx={}
        self.i=0
        i=0

        #Establish the maximum number of threads the machine can optimally handle
        #Generally relates to the number of processors

        self.threadtest = QThread(self)
        self.idealthreadcount = self.threadtest.idealThreadCount()

        print("This machine can handle {} threads optimally".format(self.idealthreadcount))

        while i <self.idealthreadcount:
            self.setupThread(i)
            i+=1

        i=0
        while i<self.idealthreadcount:
            self.startThread(i)
            i+=1

        print("Main Gui running in thread {}.".format(self.thread()))


    def setupThread(self,i):

        self.worker[i]= worker.Worker(i)  # no parent!
        #print("Worker object runningt in thread {} prior to movetothread".format(self.worker[i].thread()) )
        self.threadx[i] = Thread(i,parent=self)  #  if parent isn't specified then need to be careful to destroy thread 
        self.threadx[i].setObjectName("python thread{}"+str(i))
        #print("Thread object runningt in thread {} prior to movetothread".format(self.threadx[i].thread()) )
        self.threadx[i].startedx.connect(self.threadStarted)
        self.threadx[i].finishedx.connect(self.threadFinished)

        self.worker[i].finished.connect(self.workerFinished)
        self.worker[i].intReady.connect(self.workerResultReady)

        #The next line is optional, you may want to start the threads again without having to create all the code again.
        self.worker[i].finished.connect(self.threadx[i].quit)

        self.threadx[i].started.connect(self.worker[i].procCounter)

        self.destroyed.connect(self.threadx[i].deleteLater)
        self.destroyed.connect(self.worker[i].deleteLater)

        #This is the key code that actually get the worker code onto another processor or thread.
        self.worker[i].moveToThread(self.threadx[i])

    def startThread(self,i):
        self.threadx[i].start()

    @pyqtSlot(int)
    def threadStarted(self,i):
        print('Thread {}  started'.format(i))
        print("Thread priority is {}".format(self.threadx[i].priority()))        


    @pyqtSlot(int)
    def threadFinished(self,i):
        print('Thread {} finished'.format(i))




    @pyqtSlot(int)
    def threadTerminated(self,i):
        print("Thread {} terminated".format(i))

    @pyqtSlot(int,int)
    def workerResultReady(self,j,i):
        print('Worker {} result returned'.format(i))
        if i ==0:
            self.label1.setText("{}".format(j))
        if i ==1:
            self.label2.setText("{}".format(j))
        if i ==2:
            self.label3.setText("{}".format(j))
        if i ==3:
            self.label4.setText("{}".format(j)) 

        #print('Thread {} has started'.format(self.threadx[i].currentThreadId()))    

    @pyqtSlot(int)
    def workerFinished(self,i):
        print('Worker {} finished'.format(i))

    def initUI(self):
        self.label1 = QLabel("0")
        self.label2= QLabel("0")
        self.label3= QLabel("0")
        self.label4 = QLabel("0")
        grid = QGridLayout(self)
        self.setLayout(grid)
        grid.addWidget(self.label1,0,0)
        grid.addWidget(self.label2,0,1) 
        grid.addWidget(self.label3,0,2) 
        grid.addWidget(self.label4,0,3) #Layout parents the self.labels

        self.move(300, 150)
        self.setGeometry(0,0,300,300)
        #self.size(300,300)
        self.setWindowTitle('thread test')
        self.show()

    def closeEvent(self, event):
        print('Closing')

        #this tells the threads to stop running
        i=0
        while i <self.idealthreadcount:
            self.threadx[i].quit()
            i+=1

         #this ensures window cannot be closed until the threads have finished.
        i=0
        while i <self.idealthreadcount:
            self.threadx[i].wait() 
            i+=1        


        event.accept()


if __name__=='__main__':
    app = QApplication(sys.argv)
    form = Form()
    sys.exit(app.exec_())

下面是工人代码

#!/usr/bin/env python3
#coding:utf-8
# Author:   --<>
# Purpose:  Stack Overflow
# Created: 19/12/15

import sys
import unittest


from PyQt4.QtCore import QThread, QObject, pyqtSignal, pyqtSlot
import time
import random


class Worker(QObject):
    finished = pyqtSignal(int)
    intReady = pyqtSignal(int,int)

    def __init__(self, i=0):
        '''__init__ is called while the worker is still in the Gui thread. Do not put slow or CPU intensive code in the __init__ method'''

        super().__init__()
        self.idd = i



    @pyqtSlot()
    def procCounter(self): # This slot takes no params
        for j in range(1, 10):
            random_time = random.weibullvariate(1,2)
            time.sleep(random_time)
            self.intReady.emit(j,self.idd)
            print('Worker {0} in thread {1}'.format(self.idd, self.thread().idd))

        self.finished.emit(self.idd)


if __name__=='__main__':
    unittest.main()

解决方案 7:

PySide2解决方案:

与 PyQt5 不同,在 PySide2 中,QThread.started 信号是在原始线程而不是工作线程上接收/处理的!幸运的是,它仍然在工作线程上接收所有其他信号。

为了匹配 PyQt5 的行为,您必须自己创建已启动信号。

这是一个简单的解决方案:

# Use this class instead of QThread
class QThread2(QThread):
    # Use this signal instead of "started"
    started2 = Signal()

    def __init__(self):
        QThread.__init__(self)
        self.started.connect(self.onStarted)

    def onStarted(self):
        self.started2.emit()
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1048  
  在产品开发领域,如何提升产品交付质量一直是企业关注的焦点。集成产品开发(IPD)作为一种系统化的产品开发方法,通过跨职能团队的协同、流程的优化以及资源的整合,能够有效提升产品的交付质量。IPD培训作为推动这一方法落地的重要工具,不仅能够帮助团队理解IPD的核心原则,还能通过实践和案例学习,提升团队的执行力和协作效率。本...
IPD研发管理体系   0  
  在现代企业中,跨部门合作已成为项目成功的关键因素之一。随着业务复杂性的增加,单一部门难以独立完成复杂的项目任务,因此需要多个部门的协同努力。然而,跨部门合作往往面临沟通不畅、职责不清、资源冲突等挑战,这些问题如果得不到有效解决,将直接影响项目的进度和质量。在这种背景下,IPD(集成产品开发)项目流程图作为一种系统化的管...
华为IPD流程   0  
  在研发IPD(集成产品开发)流程中,跨部门协作是确保项目成功的关键因素之一。IPD流程强调从概念到市场的全生命周期管理,涉及市场、研发、制造、供应链等多个部门的协同工作。然而,由于各部门的目标、工作方式和优先级不同,跨部门协作往往面临沟通不畅、资源冲突、决策延迟等挑战。为了应对这些挑战,企业需要采取系统化的方法,优化跨...
IPD概念阶段   0  
  在项目管理的生命周期中,CDCP(Concept Development and Control Plan)阶段是项目从概念到实施的关键过渡期。这一阶段不仅需要明确项目的目标和范围,还需要确保项目团队能够灵活应对可能出现的变更和调整。变更管理在这一阶段尤为重要,因为任何未经控制的变更都可能对项目的进度、成本和质量产生深...
IPD流程中TR   0  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用