threading.Timer - 每 'n' 秒重复一次函数
- 2024-12-30 08:42:00
- admin 原创
- 49
问题描述:
我想每 0.5 秒触发一次函数,并能够启动、停止和重置计时器。我不太了解 Python 线程的工作原理,并且在使用 Python 计时器时遇到了困难。
但是,RuntimeError: threads can only be started once
当我执行两次时,我还是会遇到这种情况。有办法解决这个问题吗?我尝试在每次启动前threading.timer.start()
应用。threading.timer.cancel()
伪代码:
t=threading.timer(0.5,function)
while True:
t.cancel()
t.start()
解决方案 1:
最好的方法是启动一次定时器线程。在定时器线程中,你可以编写以下代码
class MyThread(Thread):
def __init__(self, event):
Thread.__init__(self)
self.stopped = event
def run(self):
while not self.stopped.wait(0.5):
print("my thread")
# call a function
在启动计时器的代码中,您可以使用set
停止事件来停止计时器。
stopFlag = Event()
thread = MyThread(stopFlag)
thread.start()
# this will stop the timer
stopFlag.set()
解决方案 2:
对Hans Then 的回答进行一些改进,我们可以直接将 Timer 函数子类化。以下成为我们整个“重复计时器”代码,它可以用作具有所有相同参数的 threading.Timer 的替代品:
from threading import Timer
class RepeatTimer(Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
使用示例:
def dummyfn(msg="foo"):
print(msg)
timer = RepeatTimer(1, dummyfn)
timer.start()
time.sleep(5)
timer.cancel()
产生以下输出:
foo
foo
foo
foo
和
timer = RepeatTimer(1, dummyfn, args=("bar",))
timer.start()
time.sleep(5)
timer.cancel()
生产
bar
bar
bar
bar
解决方案 3:
来自Python 中 setInterval 的等效项:
import threading
def setInterval(interval):
def decorator(function):
def wrapper(*args, **kwargs):
stopped = threading.Event()
def loop(): # executed in another thread
while not stopped.wait(interval): # until stopped
function(*args, **kwargs)
t = threading.Thread(target=loop)
t.daemon = True # stop if the program exits
t.start()
return stopped
return wrapper
return decorator
用法:
@setInterval(.5)
def function():
"..."
stop = function() # start timer, the first call is in .5 seconds
stop.set() # stop the loop
stop = function() # start new timer
# ...
stop.set()
或者这是相同的功能,但是作为独立函数而不是装饰器:
cancel_future_calls = call_repeatedly(60, print, "Hello, World")
# ...
cancel_future_calls()
以下是不使用线程来实现此目的的方法。
解决方案 4:
使用计时器线程
from threading import Timer,Thread,Event
class perpetualTimer():
def __init__(self,t,hFunction):
self.t=t
self.hFunction = hFunction
self.thread = Timer(self.t,self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t,self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
print 'ipsem lorem'
t = perpetualTimer(5,printer)
t.start()
这可以通过以下方式阻止t.cancel()
解决方案 5:
为了按照 OP 的要求使用 Timer 提供正确的答案,我将改进swapnil jariwala 的答案:
from threading import Timer
class InfiniteTimer():
"""A Timer class that does not stop, unless you want it to."""
def __init__(self, seconds, target):
self._should_continue = False
self.is_running = False
self.seconds = seconds
self.target = target
self.thread = None
def _handle_target(self):
self.is_running = True
self.target()
self.is_running = False
self._start_timer()
def _start_timer(self):
if self._should_continue: # Code could have been running when cancel was called.
self.thread = Timer(self.seconds, self._handle_target)
self.thread.start()
def start(self):
if not self._should_continue and not self.is_running:
self._should_continue = True
self._start_timer()
else:
print("Timer already started or running, please wait if you're restarting.")
def cancel(self):
if self.thread is not None:
self._should_continue = False # Just in case thread is running and cancel fails.
self.thread.cancel()
else:
print("Timer never started or failed to initialize.")
def tick():
print('ipsem lorem')
# Example Usage
t = InfiniteTimer(0.5, tick)
t.start()
解决方案 6:
我更改了 swapnil-jariwala 代码中的某些代码来制作一个小型控制台时钟。
from threading import Timer, Thread, Event
from datetime import datetime
class PT():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def printer():
tempo = datetime.today()
h,m,s = tempo.hour, tempo.minute, tempo.second
print(f"{h}:{m}:{s}")
t = PT(1, printer)
t.start()
输出
>>> 11:39:11
11:39:12
11:39:13
11:39:14
11:39:15
11:39:16
...
带有 tkinter 图形界面的计时器
此代码将时钟计时器放在 tkinter 的一个小窗口中
from threading import Timer, Thread, Event
from datetime import datetime
import tkinter as tk
app = tk.Tk()
lab = tk.Label(app, text="Timer will start in a sec")
lab.pack()
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def printer():
tempo = datetime.today()
clock = "{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second)
try:
lab['text'] = clock
except RuntimeError:
exit()
t = perpetualTimer(1, printer)
t.start()
app.mainloop()
抽认卡游戏的一个例子(有点)
from threading import Timer, Thread, Event
from datetime import datetime
class perpetualTimer():
def __init__(self, t, hFunction):
self.t = t
self.hFunction = hFunction
self.thread = Timer(self.t, self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
x = datetime.today()
start = x.second
def printer():
global questions, counter, start
x = datetime.today()
tempo = x.second
if tempo - 3 > start:
show_ans()
#print("
{}:{}:{}".format(tempo.hour, tempo.minute, tempo.second), end="")
print()
print("-" + questions[counter])
counter += 1
if counter == len(answers):
counter = 0
def show_ans():
global answers, c2
print("It is {}".format(answers[c2]))
c2 += 1
if c2 == len(answers):
c2 = 0
questions = ["What is the capital of Italy?",
"What is the capital of France?",
"What is the capital of England?",
"What is the capital of Spain?"]
answers = "Rome", "Paris", "London", "Madrid"
counter = 0
c2 = 0
print("Get ready to answer")
t = perpetualTimer(3, printer)
t.start()
输出:
Get ready to answer
>>>
-What is the capital of Italy?
It is Rome
-What is the capital of France?
It is Paris
-What is the capital of England?
...
解决方案 7:
我有点晚了,但这是我的两点看法:
您可以threading.Timer
通过.run()
重复调用其方法来重用该对象,如下所示:
class SomeClassThatNeedsATimer:
def __init__(...):
self.timer = threading.Timer(interval, self.on_timer)
self.timer.start()
def on_timer(self):
print('On timer')
self.timer.run()
解决方案 8:
我必须为一个项目做这件事。我最终做的是为该函数启动一个单独的线程
t = threading.Thread(target =heartbeat, args=(worker,))
t.start()
heartbeat 是我的功能,worker 是我的参数之一
在我的心跳功能里面:
def heartbeat(worker):
while True:
time.sleep(5)
#all of my code
因此,当我启动线程时,该函数将重复等待 5 秒,运行我的所有代码,并无限期地执行此操作。如果您想终止进程,只需终止线程即可。
解决方案 9:
除了上述使用线程的出色答案之外,如果您必须使用主线程或更喜欢异步方法 - 我在aio_timers Timer 类周围包装了一个简短的类(以启用重复)
import asyncio
from aio_timers import Timer
class RepeatingAsyncTimer():
def __init__(self, interval, cb, *args, **kwargs):
self.interval = interval
self.cb = cb
self.args = args
self.kwargs = kwargs
self.aio_timer = None
self.start_timer()
def start_timer(self):
self.aio_timer = Timer(delay=self.interval,
callback=self.cb_wrapper,
callback_args=self.args,
callback_kwargs=self.kwargs
)
def cb_wrapper(self, *args, **kwargs):
self.cb(*args, **kwargs)
self.start_timer()
from time import time
def cb(timer_name):
print(timer_name, time())
print(f'clock starts at: {time()}')
timer_1 = RepeatingAsyncTimer(interval=5, cb=cb, timer_name='timer_1')
timer_2 = RepeatingAsyncTimer(interval=10, cb=cb, timer_name='timer_2')
时钟开始于:16024388 40 .9690785
计时器_ 1 16024388 45 .980087
计时器_ 2 16024388 50 .9806316
计时器_ 1 16024388 50 .9808934
计时器_ 1 16024388 55 .9863033
计时器_ 2 16024388 60 .9868324
计时器_ 1 16024388 60 .9876585
解决方案 10:
我已经实现了一个用作计时器的类。
如果有人需要,我会在这里留下链接:
https: //github.com/ivanhalencp/python/tree/master/xTimer
解决方案 11:
from threading import Timer
def TaskManager():
#do stuff
t = Timer( 1, TaskManager )
t.start()
TaskManager()
这是一个小示例,它将有助于更好地理解它是如何运行的。函数 taskManager() 在最后创建对其自身的延迟函数调用。
尝试改变“dalay”变量,你将能够看到差异
from threading import Timer, _sleep
# ------------------------------------------
DATA = []
dalay = 0.25 # sec
counter = 0
allow_run = True
FIFO = True
def taskManager():
global counter, DATA, delay, allow_run
counter += 1
if len(DATA) > 0:
if FIFO:
print("["+str(counter)+"] new data: ["+str(DATA.pop(0))+"]")
else:
print("["+str(counter)+"] new data: ["+str(DATA.pop())+"]")
else:
print("["+str(counter)+"] no data")
if allow_run:
#delayed method/function call to it self
t = Timer( dalay, taskManager )
t.start()
else:
print(" END task-manager: disabled")
# ------------------------------------------
def main():
DATA.append("data from main(): 0")
_sleep(2)
DATA.append("data from main(): 1")
_sleep(2)
# ------------------------------------------
print(" START task-manager:")
taskManager()
_sleep(2)
DATA.append("first data")
_sleep(2)
DATA.append("second data")
print(" START main():")
main()
print(" END main():")
_sleep(2)
DATA.append("last data")
allow_run = False
解决方案 12:
我喜欢 right2clicky 的答案,尤其是它不需要每次计时器计时时拆除线程并创建新线程。此外,创建一个带有定期调用的计时器回调的类很容易覆盖。这是我的正常用例:
class MyClass(RepeatTimer):
def __init__(self, period):
super().__init__(period, self.on_timer)
def on_timer(self):
print("Tick")
if __name__ == "__main__":
mc = MyClass(1)
mc.start()
time.sleep(5)
mc.cancel()
解决方案 13:
这是使用函数而不是类的替代实现。灵感来自上面的@Andrew Wilkins。
因为 wait 比 sleep 更准确(它考虑了函数运行时间):
import threading
PING_ON = threading.Event()
def ping():
while not PING_ON.wait(1):
print("my thread %s" % str(threading.current_thread().ident))
t = threading.Thread(target=ping)
t.start()
sleep(5)
PING_ON.set()
解决方案 14:
我已经想出了另一个使用 SingleTon 类的解决方案。请告诉我这里是否存在内存泄漏。
import time,threading
class Singleton:
__instance = None
sleepTime = 1
executeThread = False
def __init__(self):
if Singleton.__instance != None:
raise Exception("This class is a singleton!")
else:
Singleton.__instance = self
@staticmethod
def getInstance():
if Singleton.__instance == None:
Singleton()
return Singleton.__instance
def startThread(self):
self.executeThread = True
self.threadNew = threading.Thread(target=self.foo_target)
self.threadNew.start()
print('doing other things...')
def stopThread(self):
print("Killing Thread ")
self.executeThread = False
self.threadNew.join()
print(self.threadNew)
def foo(self):
print("Hello in " + str(self.sleepTime) + " seconds")
def foo_target(self):
while self.executeThread:
self.foo()
print(self.threadNew)
time.sleep(self.sleepTime)
if not self.executeThread:
break
sClass = Singleton()
sClass.startThread()
time.sleep(5)
sClass.getInstance().stopThread()
sClass.getInstance().sleepTime = 2
sClass.startThread()
解决方案 15:
为了按照 OP 的要求使用 Timer 提供正确的答案,我将改进https://stackoverflow.com/a/41450617/20750754
from threading import Timer
class InfiniteTimer():
"""A Timer class that does not stop, unless you want it to."""
def __init__(self, seconds, target, args=[], kwargs=dict()):
self._should_continue = False
self.is_running = False
self.seconds = seconds
self.target = target
self.args = args
self.kwargs = kwargs
self.thread = None
def _handle_target(self):
self.is_running = True
self.target(*self.args, **self.kwargs)
self.is_running = False
self._start_timer()
def _start_timer(self):
if self._should_continue: # Code could have been running when cancel was called.
self.thread = Timer(
self.seconds,
self._handle_target,
)
self.thread.start()
def start(self):
if not self._should_continue and not self.is_running:
self._should_continue = True
self._start_timer()
else:
print(
"Timer already started or running, please wait if you're restarting.")
def cancel(self):
if self.thread is not None:
self._should_continue = False # Just in case thread is running and cancel fails.
self.thread.cancel()
else:
print("Timer never started or failed to initialize.")
def tick(i):
print('ipsem lorem', i)
# Example Usage
t = InfiniteTimer(0.5, tick, kwargs=dict(i="i"))
t.start()
解决方案 16:
这是持续运行计时器的示例代码。只需在耗尽时创建一个新计时器并调用相同的函数即可。这不是最好的方法,但也可以这样做。
import threading
import time
class ContinousTimer():
def __init__(self):
self.timer = None
def run(self, msg='abc'):
print(msg)
self.timer = threading.Timer(interval=2, function=self.run, args=(msg, ))
self.timer.start()
if __name__ == "__main__":
t = ContinousTimer()
try:
t.run(msg="Hello")
while True:
time.sleep(0.1)
except KeyboardInterrupt:
# Cancel Timer
t.timer.cancel()
- 2024年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)
- 项目管理必备:盘点2024年13款好用的项目管理软件