threading.Timer - 每 'n' 秒重复一次函数

2024-12-30 08:42:00
admin
原创
109
摘要:问题描述:我想每 0.5 秒触发一次函数,并能够启动、停止和重置计时器。我不太了解 Python 线程的工作原理,并且在使用 Python 计时器时遇到了困难。但是,RuntimeError: threads can only be started once当我执行两次时,我还是会遇到这种情况。有办法解决这个...

问题描述:

我想每 0.5 秒触发一次函数,并能够启动、停止和重置计时器。我不太了解 Python 线程的工作原理,并且在使用 Python 计时器时遇到了困难。

但是,RuntimeError: threads can only be started once当我执行两次时,我还是会遇到这种情况。有办法解决这个问题吗?我尝试在每次启动前threading.timer.start()应用。threading.timer.cancel()

伪代码:

t=threading.timer(0.5,function)
while True:
    t.cancel()
    t.start()

解决方案 1:

最好的方法是启动一次定时器线程。在定时器线程中,你可以编写以下代码

class MyThread(Thread):
    def __init__(self, event):
        Thread.__init__(self)
        self.stopped = event

    def run(self):
        while not self.stopped.wait(0.5):
            print("my thread")
            # call a function

在启动计时器的代码中,您可以使用set停止事件来停止计时器。

stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()

解决方案 2:

对Hans Then 的回答进行一些改进,我们可以直接将 Timer 函数子类化。以下成为我们整个“重复计时器”代码,它可以用作具有所有相同参数的 threading.Timer 的替代品:

from threading import Timer

class RepeatTimer(Timer):
    def run(self):
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)

使用示例:

def dummyfn(msg="foo"):
    print(msg)

timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()

产生以下输出:

foo
foo
foo
foo

timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()

生产

bar
bar
bar
bar

解决方案 3:

来自Python 中 setInterval 的等效项:

import threading

def setInterval(interval):
    def decorator(function):
        def wrapper(*args, **kwargs):
            stopped = threading.Event()

            def loop(): # executed in another thread
                while not stopped.wait(interval): # until stopped
                    function(*args, **kwargs)

            t = threading.Thread(target=loop)
            t.daemon = True # stop if the program exits
            t.start()
            return stopped
        return wrapper
    return decorator

用法:

@setInterval(.5)
def function():
    "..."

stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set() 

或者这是相同的功能,但是作为独立函数而不是装饰器:

cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls() 

以下是不使用线程来实现此目的的方法。

解决方案 4:

使用计时器线程

from threading import Timer,Thread,Event


class perpetualTimer():

   def __init__(self,t,hFunction):
      self.t=t
      self.hFunction = hFunction
      self.thread = Timer(self.t,self.handle_function)

   def handle_function(self):
      self.hFunction()
      self.thread = Timer(self.t,self.handle_function)
      self.thread.start()

   def start(self):
      self.thread.start()

   def cancel(self):
      self.thread.cancel()

def printer():
    print 'ipsem lorem'

t = perpetualTimer(5,printer)
t.start()

这可以通过以下方式阻止t.cancel()

解决方案 5:

为了按照 OP 的要求使用 Timer 提供正确的答案,我将改进swapnil jariwala 的答案:

from threading import Timer


class InfiniteTimer():
    """A Timer class that does not stop, unless you want it to."""

    def __init__(self, seconds, target):
        self._should_continue = False
        self.is_running = False
        self.seconds = seconds
        self.target = target
        self.thread = None

    def _handle_target(self):
        self.is_running = True
        self.target()
        self.is_running = False
        self._start_timer()

    def _start_timer(self):
        if self._should_continue: # Code could have been running when cancel was called.
            self.thread = Timer(self.seconds, self._handle_target)
            self.thread.start()

    def start(self):
        if not self._should_continue and not self.is_running:
            self._should_continue = True
            self._start_timer()
        else:
            print("Timer already started or running, please wait if you're restarting.")

    def cancel(self):
        if self.thread is not None:
            self._should_continue = False # Just in case thread is running and cancel fails.
            self.thread.cancel()
        else:
            print("Timer never started or failed to initialize.")


def tick():
    print('ipsem lorem')

# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()

解决方案 6:

我更改了 swapnil-jariwala 代码中的某些代码来制作一个小型控制台时钟。

from threading import Timer, Thread, Event
from datetime import datetime

class PT():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

def printer():
    tempo = datetime.today()
    h,m,s = tempo.hour, tempo.minute, tempo.second
    print(f"{h}:{m}:{s}")


t = PT(1, printer)
t.start()

输出

>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...

带有 tkinter 图形界面的计时器

此代码将时钟计时器放在 tkinter 的一个小窗口中

from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk

app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


def printer():
    tempo = datetime.today()
    clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
    try:
        lab['text'] = clock
    except RuntimeError:
        exit()


t = perpetualTimer(1, printer)
t.start()
app.mainloop()

抽认卡游戏的一个例子(有点)

from threading import Timer, Thread, Event
from datetime import datetime


class perpetualTimer():

    def __init__(self, t, hFunction):
        self.t = t
        self.hFunction = hFunction
        self.thread = Timer(self.t, self.handle_function)

    def handle_function(self):
        self.hFunction()
        self.thread = Timer(self.t, self.handle_function)
        self.thread.start()

    def start(self):
        self.thread.start()

    def cancel(self):
        self.thread.cancel()


x = datetime.today()
start = x.second


def printer():
    global questions, counter, start
    x = datetime.today()
    tempo = x.second
    if tempo - 3 > start:
        show_ans()
    #print("
{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
    print()
    print("-" + questions[counter])
    counter += 1
    if counter == len(answers):
        counter = 0


def show_ans():
    global answers, c2
    print("It is {}".format(answers[c2]))
    c2 += 1
    if c2 == len(answers):
        c2 = 0


questions = ["What is the capital of Italy?",
             "What is the capital of France?",
             "What is the capital of England?",
             "What is the capital of Spain?"]

answers = "Rome", "Paris", "London", "Madrid"

counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()

输出:

Get ready to answer
>>> 
-What is the capital of Italy?
It is Rome

-What is the capital of France?
It is Paris

-What is the capital of England?
...

解决方案 7:

我有点晚了,但这是我的两点看法:

您可以threading.Timer通过.run()重复调用其方法来重用该对象,如下所示:

class SomeClassThatNeedsATimer:
    def __init__(...):
        self.timer = threading.Timer(interval, self.on_timer)
        self.timer.start()

    def on_timer(self):
        print('On timer')
        self.timer.run()

解决方案 8:

我必须为一个项目做这件事。我最终做的是为该函数启动一个单独的线程

t = threading.Thread(target =heartbeat, args=(worker,))
t.start()

heartbeat 是我的功能,worker 是我的参数之一

在我的心跳功能里面:

def heartbeat(worker):

    while True:
        time.sleep(5)
        #all of my code

因此,当我启动线程时,该函数将重复等待 5 秒,运行我的所有代码,并无限期地执行此操作。如果您想终止进程,只需终止线程即可。

解决方案 9:

除了上述使用线程的出色答案之外,如果您必须使用主线程或更喜欢异步方法 - 我在aio_timers Timer 类周围包装了一个简短的类(以启用重复)

import asyncio
from aio_timers import Timer

class RepeatingAsyncTimer():
    def __init__(self, interval, cb, *args, **kwargs):
        self.interval = interval
        self.cb = cb
        self.args = args
        self.kwargs = kwargs
        self.aio_timer = None
        self.start_timer()
    
    def start_timer(self):
        self.aio_timer = Timer(delay=self.interval, 
                               callback=self.cb_wrapper, 
                               callback_args=self.args, 
                               callback_kwargs=self.kwargs
                              )
    
    def cb_wrapper(self, *args, **kwargs):
        self.cb(*args, **kwargs)
        self.start_timer()


from time import time
def cb(timer_name):
    print(timer_name, time())

print(f'clock starts at: {time()}')
timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1')
timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')

时钟开始于:16024388 40 .9690785

计时器_ 1 16024388 45 .980087

计时器_ 2 16024388 50 .9806316

计时器_ 1 16024388 50 .9808934

计时器_ 1 16024388 55 .9863033

计时器_ 2 16024388 60 .9868324

计时器_ 1 16024388 60 .9876585

解决方案 10:

我已经实现了一个用作计时器的类。

如果有人需要,我会在这里留下链接:
https: //github.com/ivanhalencp/python/tree/master/xTimer

解决方案 11:

from threading import Timer
def TaskManager():
    #do stuff
    t = Timer( 1, TaskManager )
    t.start()

TaskManager()

这是一个小示例,它将有助于更好地理解它是如何运行的。函数 taskManager() 在最后创建对其自身的延迟函数调用。

尝试改变“dalay”变量,你将能够看到差异

from threading import Timer, _sleep

# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True

def taskManager():

    global counter, DATA, delay, allow_run
    counter += 1

    if len(DATA) > 0:
        if FIFO:
            print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
        else:
            print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")

    else:
        print("["+str(counter)+"] no data")

    if allow_run:
        #delayed method/function call to it self
        t = Timer( dalay, taskManager )
        t.start()

    else:
        print(" END task-manager: disabled")

# ------------------------------------------
def main():

    DATA.append("data from main(): 0")
    _sleep(2)
    DATA.append("data from main(): 1")
    _sleep(2)


# ------------------------------------------
print(" START task-manager:")
taskManager()

_sleep(2)
DATA.append("first data")

_sleep(2)
DATA.append("second data")

print(" START main():")
main()
print(" END main():")

_sleep(2)
DATA.append("last data")

allow_run = False

解决方案 12:

我喜欢 right2clicky 的答案,尤其是它不需要每次计时器计时时拆除线程并创建新线程。此外,创建一个带有定期调用的计时器回调的类很容易覆盖。这是我的正常用例:

class MyClass(RepeatTimer):
    def __init__(self, period):
        super().__init__(period, self.on_timer)

    def on_timer(self):
        print("Tick")


if __name__ == "__main__":
    mc = MyClass(1)
    mc.start()
    time.sleep(5)
    mc.cancel()

解决方案 13:

这是使用函数而不是类的替代实现。灵感来自上面的@Andrew Wilkins。

因为 wait 比 sleep 更准确(它考虑了函数运行时间):

import threading

PING_ON = threading.Event()

def ping():
  while not PING_ON.wait(1):
    print("my thread %s" % str(threading.current_thread().ident))

t = threading.Thread(target=ping)
t.start()

sleep(5)
PING_ON.set()

解决方案 14:

我已经想出了另一个使用 SingleTon 类的解决方案。请告诉我这里是否存在内存泄漏。

import time,threading

class Singleton:
  __instance = None
  sleepTime = 1
  executeThread = False

  def __init__(self):
     if Singleton.__instance != None:
        raise Exception("This class is a singleton!")
     else:
        Singleton.__instance = self

  @staticmethod
  def getInstance():
     if Singleton.__instance == None:
        Singleton()
     return Singleton.__instance


  def startThread(self):
     self.executeThread = True
     self.threadNew = threading.Thread(target=self.foo_target)
     self.threadNew.start()
     print('doing other things...')


  def stopThread(self):
     print("Killing Thread ")
     self.executeThread = False
     self.threadNew.join()
     print(self.threadNew)


  def foo(self):
     print("Hello in " + str(self.sleepTime) + " seconds")


  def foo_target(self):
     while self.executeThread:
        self.foo()
        print(self.threadNew)
        time.sleep(self.sleepTime)

        if not self.executeThread:
           break


sClass = Singleton()
sClass.startThread()
time.sleep(5)
sClass.getInstance().stopThread()

sClass.getInstance().sleepTime = 2
sClass.startThread()

解决方案 15:

为了按照 OP 的要求使用 Timer 提供正确的答案,我将改进https://stackoverflow.com/a/41450617/20750754

from threading import Timer


class InfiniteTimer():
  """A Timer class that does not stop, unless you want it to."""

  def __init__(self, seconds, target, args=[], kwargs=dict()):
    self._should_continue = False
    self.is_running = False
    self.seconds = seconds
    self.target = target
    self.args = args
    self.kwargs = kwargs
    self.thread = None

  def _handle_target(self):
    self.is_running = True
    self.target(*self.args, **self.kwargs)
    self.is_running = False
    self._start_timer()

  def _start_timer(self):
    if self._should_continue:  # Code could have been running when cancel was called.
      self.thread = Timer(
        self.seconds,
        self._handle_target,
      )
      self.thread.start()

  def start(self):
    if not self._should_continue and not self.is_running:
      self._should_continue = True
      self._start_timer()
    else:
      print(
        "Timer already started or running, please wait if you're restarting.")

  def cancel(self):
    if self.thread is not None:
      self._should_continue = False  # Just in case thread is running and cancel fails.
      self.thread.cancel()
    else:
      print("Timer never started or failed to initialize.")


def tick(i):
  print('ipsem lorem', i)


# Example Usage
t = InfiniteTimer(0.5, tick, kwargs=dict(i="i"))
t.start()

解决方案 16:

这是持续运行计时器的示例代码。只需在耗尽时创建一个新计时器并调用相同的函数即可。这不是最好的方法,但也可以这样做。

import threading
import time


class ContinousTimer():
    def __init__(self):
        self.timer = None

    def run(self, msg='abc'):
        print(msg)

        self.timer = threading.Timer(interval=2, function=self.run, args=(msg, ))
        self.timer.start()


if __name__ == "__main__":
    t = ContinousTimer()
    try:
        t.run(msg="Hello")
        while True:
            time.sleep(0.1)
    except KeyboardInterrupt:
        # Cancel Timer
        t.timer.cancel()
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用