如何每 x 秒重复执行一个函数?
- 2024-11-21 08:33:00
- admin 原创
- 4
问题描述:
我想永久地每 60 秒重复执行一次 Python 函数(就像Objective C 中的NSTimer或 JS 中的 setTimeout 一样)。此代码将作为守护进程运行,实际上就像使用 cron 每分钟调用一次 Python 脚本一样,但不需要用户进行设置。
在这个关于用 Python 实现的 cron 的问题中,解决方案似乎实际上只是sleep() x 秒。我不需要如此高级的功能,所以也许像这样的方法可以工作
while True:
# Code executed here
time.sleep(60)
该代码是否存在可预见的问题?
解决方案 1:
如果您的程序还没有事件循环,请使用sched模块,它实现了通用事件调度程序。
import sched, time
def do_something(scheduler):
# schedule the next call first
scheduler.enter(60, 1, do_something, (scheduler,))
print("Doing stuff...")
# then do your stuff
my_scheduler = sched.scheduler(time.time, time.sleep)
my_scheduler.enter(60, 1, do_something, (my_scheduler,))
my_scheduler.run()
如果您已经在使用事件循环库(如、、、、、、asyncio
等) - 只需使用现有事件循环库的方法来安排任务。trio
`tkinterPyQt5
gobject`kivy
解决方案 2:
将时间循环锁定到系统时钟,如下所示:
import time
starttime = time.monotonic()
while True:
print("tick")
time.sleep(60.0 - ((time.monotonic() - starttime) % 60.0))
使用“单调”时钟正常工作;time()通过太阳/法定时间变化、ntp 同步等进行调整......
解决方案 3:
如果您想要一种非阻塞方式定期执行函数,而不是阻塞无限循环,我会使用线程计时器。这样您的代码就可以继续运行并执行其他任务,并且仍然每 n 秒调用一次函数。我经常使用这种技术来打印耗时长、占用大量 CPU/磁盘/网络的任务的进度信息。
这是我在类似问题中发布的代码,其中包含 start() 和 stop() 控制:
from threading import Timer
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
用法:
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
特征:
仅限标准库,无外部依赖
start()
`stop()`即使计时器已启动/停止,也可以安全地多次调用被调用的函数可以有位置参数和命名参数
您可以随时更改
interval
,它将在下次运行后生效。args
和kwargs
也一样function
!
解决方案 4:
您可能需要考虑Twisted ,它是一个实现Reactor 模式的 Python 网络库。
from twisted.internet import task, reactor
timeout = 60.0 # Sixty seconds
def doWork():
#do work here
pass
l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds
reactor.run()
虽然“while True: sleep(60)”可能会起作用,但 Twisted 可能已经实现了许多你最终需要的功能(bobince 指出的守护进程、日志记录或异常处理),并且可能是一个更强大的解决方案
解决方案 5:
这是 MestreLion 代码的更新,可以避免随着时间的推移而出现漂移。
此处的 RepeatedTimer 类按照 OP 的要求每“间隔”秒调用一次给定函数;计划不取决于函数执行所需的时间。我喜欢这个解决方案,因为它没有外部库依赖项;这只是纯 Python。
import threading
import time
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = time.time()
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
示例用法(从 MestreLion 的答案复制而来):
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
解决方案 6:
import time, traceback
def every(delay, task):
next_time = time.time() + delay
while True:
time.sleep(max(0, next_time - time.time()))
try:
task()
except Exception:
traceback.print_exc()
# in production code you might want to have this instead of course:
# logger.exception("Problem while executing repetitive task.")
# skip tasks if we are behind schedule:
next_time += (time.time() - next_time) // delay * delay + delay
def foo():
print("foo", time.time())
every(5, foo)
如果您想在不阻止剩余代码的情况下执行此操作,您可以使用它让它在自己的线程中运行:
import threading
threading.Thread(target=lambda: every(5, foo)).start()
该解决方案结合了其他解决方案中很少见的几个功能:
异常处理:在这个级别上,尽可能地正确处理异常,即在不中止程序的情况下记录异常以用于调试目的。
无链接:您在许多答案中发现的常见链式实现(用于安排下一个事件)在以下方面很脆弱:如果调度机制(
threading.Timer
或其他任何机制)出现任何问题,这将终止链。即使问题的原因已经解决,也不会发生进一步的执行。相比之下,简单的循环和简单的等待sleep()
要稳健得多。无偏差:我的解决方案精确跟踪其应运行的时间。不会因执行时间而出现偏差(许多其他解决方案也是如此)。
跳过:如果一次执行花费的时间太长(例如,每五秒执行一次 X,但 X 花费了 6 秒),我的解决方案将跳过任务。这是标准的 cron 行为(并且有充分的理由)。许多其他解决方案则只是连续多次执行任务而没有任何延迟。在大多数情况下(例如清理任务),这并不是我们所希望的。如果希望,只需使用即可
next_time += delay
。
解决方案 7:
我认为更简单的方法是:
import time
def executeSomething():
#code here
time.sleep(60)
while True:
executeSomething()
这样,您的代码就会执行,然后等待 60 秒,然后再次执行,等待,执行等...无需使事情复杂化:D
解决方案 8:
我最终使用了计划模块。API 很好。
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)
while True:
schedule.run_pending()
time.sleep(1)
解决方案 9:
替代的灵活性解决方案是Apscheduler。
pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
pass
sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds
sched.start()
此外,apscheduler 还提供如下许多调度程序。
BlockingScheduler:当调度程序是进程中唯一运行的程序时使用
BackgroundScheduler:当你不使用下面任何框架时使用,并且希望调度程序在应用程序的后台运行
AsyncIOScheduler:如果你的应用程序使用 asyncio 模块,请使用
GeventScheduler:如果你的应用程序使用 gevent,请使用
TornadoScheduler:如果你正在构建 Tornado 应用程序,请使用
TwistedScheduler:如果你正在构建 Twisted 应用程序,请使用
QtScheduler:如果你正在构建 Qt 应用程序,请使用
解决方案 10:
如果不考虑漂移
import threading, time
def print_every_n_seconds(n=2):
while True:
print(time.ctime())
time.sleep(n)
thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()
异步输出。
#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018
如果运行的任务需要相当长的时间,那么间隔将变成 2 秒 + 任务时间,因此如果您需要精确的调度,那么这不适合您。
请注意,该daemon=True
标志表示此线程不会阻止应用程序关闭。例如,pytest
在运行测试后等待此线程停止时,会出现无限期挂起的问题。
解决方案 11:
我以前也遇到过类似的问题。http: //cronus.readthedocs.org可能有帮助吗?
对于 v0.2,以下代码片段有效
import cronus.beat as beat
beat.set_rate(2) # run twice per second
while beat.true():
# do some time consuming work here
beat.sleep() # total loop duration would be 0.5 sec
解决方案 12:
它和 cron 的主要区别在于,异常将彻底终止守护进程。您可能需要使用异常捕获器和记录器进行包装。
解决方案 13:
只需使用
import time
while True:
print("this will run after every 30 sec")
#Your code here
time.sleep(30)
解决方案 14:
一个可能的答案是:
import time
t=time.time()
while True:
if time.time()-t>10:
#run your task here
t=time.time()
解决方案 15:
我使用 Tkinter after() 方法,它不会“窃取游戏”(就像之前介绍的sched模块一样),即它允许其他事情并行运行:
import Tkinter
def do_something1():
global n1
n1 += 1
if n1 == 6: # (Optional condition)
print "* do_something1() is done *"; return
# Do your stuff here
# ...
print "do_something1() "+str(n1)
tk.after(1000, do_something1)
def do_something2():
global n2
n2 += 1
if n2 == 6: # (Optional condition)
print "* do_something2() is done *"; return
# Do your stuff here
# ...
print "do_something2() "+str(n2)
tk.after(500, do_something2)
tk = Tkinter.Tk();
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()
do_something1()
并且do_something2()
可以并行运行,并且以任意间隔速度运行。在这里,第二个函数的执行速度是原来的两倍。还请注意,我使用了一个简单的计数器作为终止任一函数的条件。您可以使用任何其他您喜欢的条件,或者如果您希望函数运行到程序终止(例如时钟),则可以不使用任何条件。
解决方案 16:
这是 MestreLion 代码的改编版本。除了原始函数外,此代码还:
1)添加 first_interval 用于在特定时间触发计时器(调用者需要计算 first_interval 并传入)
2)解决原代码中的竞争条件。在原代码中,如果控制线程无法取消正在运行的计时器(“停止计时器,并取消执行计时器的操作。这仅在计时器仍处于等待阶段时才有效。”引自https://docs.python.org/2/library/threading.html),计时器将无休止地运行。
class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
self.timer = None
self.first_interval = first_interval
self.interval = interval
self.func = func
self.args = args
self.kwargs = kwargs
self.running = False
self.is_started = False
def first_start(self):
try:
# no race-condition here because only control thread will call this method
# if already started will not start again
if not self.is_started:
self.is_started = True
self.timer = Timer(self.first_interval, self.run)
self.running = True
self.timer.start()
except Exception as e:
log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
raise
def run(self):
# if not stopped start again
if self.running:
self.timer = Timer(self.interval, self.run)
self.timer.start()
self.func(*self.args, **self.kwargs)
def stop(self):
# cancel current timer in case failed it's still OK
# if already stopped doesn't matter to stop again
if self.timer:
self.timer.cancel()
self.running = False
解决方案 17:
这是另一种无需使用任何额外库的解决方案。
def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
"""Delay using a boolean callable function.
`condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
It can break early if condition is met.
Args:
condition_fn - a callable boolean function
interval_in_sec - wait time between calling `condition_fn`
timeout_in_sec - maximum time to run
Returns: None
"""
start = last_call = time.time()
while time.time() - start < timeout_in_sec:
if (time.time() - last_call) > interval_in_sec:
if condition_fn() is True:
break
last_call = time.time()
解决方案 18:
我使用它来每小时引发 60 个事件,其中大多数事件发生在整分钟后的相同秒数:
import math
import time
import random
TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging
def set_timing():
now = time.time()
elapsed = now - info['begin']
minutes = math.floor(elapsed/TICK)
tick_elapsed = now - info['completion_time']
if (info['tick']+1) > minutes:
wait = max(0,(TICK_TIMING-(time.time() % TICK)))
print ('standard wait: %.2f' % wait)
time.sleep(wait)
elif tick_elapsed < TICK_MINIMUM:
wait = TICK_MINIMUM-tick_elapsed
print ('minimum wait: %.2f' % wait)
time.sleep(wait)
else:
print ('skip set_timing(); no wait')
drift = ((time.time() - info['begin']) - info['tick']*TICK -
TICK_TIMING + info['begin']%TICK)
print ('drift: %.6f' % drift)
info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK
while 1:
set_timing()
print('hello world')
#random real world event
time.sleep(random.random()*TICK_MINIMUM)
info['tick'] += 1
info['completion_time'] = time.time()
根据实际情况,您可能会获得长度的刻度:
60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.
但在 60 分钟后,您将有 60 个刻度;并且大多数刻度都会以您喜欢的分钟的正确偏移量出现。
在我的系统中,我得到的典型漂移是<1/20秒,直到需要校正。
该方法的优点是解决了时钟漂移问题;如果您执行的操作(例如每刻添加一个项目,并且您期望每小时添加 60 个项目)可能会导致问题。不考虑漂移可能会导致移动平均线等次要指标考虑太久远的过去数据,从而导致输出错误。
解决方案 19:
例如,显示当前本地时间
import datetime
import glib
import logger
def get_local_time():
current_time = datetime.datetime.now().strftime("%H:%M")
logger.info("get_local_time(): %s",current_time)
return str(current_time)
def display_local_time():
logger.info("Current time is: %s", get_local_time())
return True
# call every minute
glib.timeout_add(60*1000, display_local_time)
解决方案 20:
timed-count可以实现高精度(即 < 1 毫秒),因为它与系统时钟同步。它不会随时间漂移,也不会受到代码执行时间长度的影响(当然,前提是该时间小于间隔周期)。
一个简单的阻塞示例:
from timed_count import timed_count
for count in timed_count(60):
# Execute code here exactly every 60 seconds
...
您可以通过在线程中运行它轻松地使其非阻塞:
from threading import Thread
from timed_count import timed_count
def periodic():
for count in timed_count(60):
# Execute code here exactly every 60 seconds
...
thread = Thread(target=periodic)
thread.start()
解决方案 21:
''' tracking number of times it prints'''
import threading
global timeInterval
count=0
def printit():
threading.Timer(timeInterval, printit).start()
print( "Hello, World!")
global count
count=count+1
print(count)
printit
if __name__ == "__main__":
timeInterval= int(input('Enter Time in Seconds:'))
printit()
解决方案 22:
我认为这取决于你想做什么,而且你的问题没有具体说明很多细节。
对我来说,我想在我已经有多线程的进程中执行一项昂贵的操作。因此,我让该领导进程检查时间,并且只有她执行昂贵的操作(检查深度学习模型)。为此,我增加了计数器以确保每 5 秒保存一次,然后是 10 秒,然后是 15 秒(或使用 math.floor 的模运算):
def print_every_5_seconds_have_passed_exit_eventually():
"""
https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
:return:
"""
opts = argparse.Namespace(start=time.time())
next_time_to_print = 0
while True:
current_time_passed = time.time() - opts.start
if current_time_passed >= next_time_to_print:
next_time_to_print += 5
print(f'worked and {current_time_passed=}')
print(f'{current_time_passed % 5=}')
print(f'{math.floor(current_time_passed % 5) == 0}')
starting __main__ at __init__
worked and current_time_passed=0.0001709461212158203
current_time_passed % 5=0.0001709461212158203
True
worked and current_time_passed=5.0
current_time_passed % 5=0.0
True
worked and current_time_passed=10.0
current_time_passed % 5=0.0
True
worked and current_time_passed=15.0
current_time_passed % 5=0.0
True
对我来说,检查 if 语句就是我所需要的。如果可以避免的话,在我已经很复杂的多处理多 GPU 代码中添加线程和调度程序并不是我想要增加的复杂性,而且看起来我可以避免。检查工作器 ID 很容易确保只有 1 个进程在执行此操作。
注意,我使用了 True 打印语句来确保模块化算法技巧有效,因为检查确切时间显然行不通!但令我惊喜的是,地板确实奏效了。
解决方案 23:
我一直在寻找一个在单独线程上运行的周期性计时器,这样对象本身仍然可以执行一些工作,例如绘制当前状态。我还希望能够改变周期的速度,比如说,在计划开始时,我希望每 3 秒执行一次工作,几个小时后,我希望将其更改为每 1 小时执行一次等。我一直在寻找 asyncio 和其他库,但效果并不理想。我正在使用 Spyder IDE,尤其是 asyncio 有其自身的局限性(例如使用 nested_asyncio)。我还想随时终止进程,所以这是我的解决方案,它完全可以满足我的所有需求。希望它能对某人有所帮助:
import threading
import time
from datetime import datetime, timedelta
class MultiThreading:
def __init__(self):
self.thread = None
self.started = False
self.time = 5
self.n = 0
def task(self):
self.n += 1
print("Job task do your job {:d}".format(self.n))
def threaded_program(self):
while self.started == True:
time.sleep(1)
if datetime.now() >= self.datetime:
self.task()
self.datetime += timedelta(0, self.time)
else:
pass
def run(self):
self.datetime = datetime.now()
self.datetime += timedelta(0, self.time)
self.started = True
self.task()
self.thread = threading.Thread(target=self.threaded_program, args=())
self.thread.start()
def test(self):
print("test")
def stop(self):
print("stopping")
self.started = False
self.thread.join()
使用方法很简单:
mt = MultiThreading()
mt.run()
mt.test()
因为睡眠时间设置为1秒,所以它可以每秒取消一次线程。
mt.stop()
其目的是为了在实验室中进行自动测量,其中需要不时操作由不同类控制的设备。这些周期随着实验的进展而变化。
- 2024年20款好用的项目管理软件推荐,项目管理提效的20个工具和技巧
- 2024年开源项目管理软件有哪些?推荐5款好用的项目管理工具
- 项目管理软件有哪些?推荐7款超好用的项目管理工具
- 项目管理软件哪个最好用?盘点推荐5款好用的项目管理工具
- 项目管理软件有哪些最好用?推荐6款好用的项目管理工具
- 项目管理软件有哪些,盘点推荐国内外超好用的7款项目管理工具
- 2024项目管理软件排行榜(10类常用的项目管理工具全推荐)
- 项目管理软件排行榜:2024年项目经理必备5款开源项目管理软件汇总
- 2024年常用的项目管理软件有哪些?推荐这10款国内外好用的项目管理工具
- 项目管理必备:盘点2024年13款好用的项目管理软件