执行定期操作[重复]
- 2025-01-03 08:41:00
- admin 原创
- 87
问题描述:
我在 Windows 上工作。我想foo()
每 10 秒执行一次函数。
我该怎么做?
解决方案 1:
在 的末尾foo()
,创建一个在 10 秒后Timer
调用自身的函数。
因为,创建一个新的来调用。
您可以做其他事情而不会被阻止。foo()
Timer
`thread`foo()
import time, threading
def foo():
print(time.ctime())
threading.Timer(10, foo).start()
foo()
#output:
#Thu Dec 22 14:46:08 2011
#Thu Dec 22 14:46:18 2011
#Thu Dec 22 14:46:28 2011
#Thu Dec 22 14:46:38 2011
解决方案 2:
只需休眠 10 秒或使用threading.Timer(10,foo)
就会导致启动时间漂移。(您可能不关心这一点,或者根据您的具体情况,这可能是一个重大问题来源。)造成这种情况的原因可能有两个 - 线程唤醒时间不准确或函数执行时间不准确。
您可以在本文末尾看到一些结果,但首先要看一个如何修复它的示例。您需要跟踪您的函数下一次应被调用的时间,而不是实际被调用的时间,并解释其中的差异。
以下是略有不同之处的版本:
import datetime, threading
def foo():
print datetime.datetime.now()
threading.Timer(1, foo).start()
foo()
其输出如下所示:
2013-08-12 13:05:36.483580
2013-08-12 13:05:37.484931
2013-08-12 13:05:38.485505
2013-08-12 13:05:39.486945
2013-08-12 13:05:40.488386
2013-08-12 13:05:41.489819
2013-08-12 13:05:42.491202
2013-08-12 13:05:43.492486
2013-08-12 13:05:44.493865
2013-08-12 13:05:45.494987
2013-08-12 13:05:46.496479
2013-08-12 13:05:47.497824
2013-08-12 13:05:48.499286
2013-08-12 13:05:49.500232
您可以看到亚秒计数在不断增加,因此开始时间正在“漂移”。
这是正确考虑漂移的代码:
import datetime, threading, time
next_call = time.time()
def foo():
global next_call
print datetime.datetime.now()
next_call = next_call+1
threading.Timer( next_call - time.time(), foo ).start()
foo()
其输出如下所示:
2013-08-12 13:21:45.292565
2013-08-12 13:21:47.293000
2013-08-12 13:21:48.293939
2013-08-12 13:21:49.293327
2013-08-12 13:21:50.293883
2013-08-12 13:21:51.293070
2013-08-12 13:21:52.293393
这里你可以看到亚秒级的时间不再有任何增加。
如果您的事件发生得非常频繁,您可能希望在单个线程中运行计时器,而不是为每个事件启动一个新线程。考虑到漂移,这看起来像:
import datetime, threading, time
def foo():
next_call = time.time()
while True:
print datetime.datetime.now()
next_call = next_call+1;
time.sleep(next_call - time.time())
timerThread = threading.Thread(target=foo)
timerThread.start()
但是,您的应用程序不会正常退出,您需要终止计时器线程。如果您想在应用程序完成后正常退出,而无需手动终止线程,您应该使用
timerThread = threading.Thread(target=foo)
timerThread.daemon = True
timerThread.start()
解决方案 3:
很惊讶没有找到使用发电机计时的解决方案。我只是为了自己的目的而设计的。
该解决方案:单线程,每个周期没有对象实例,使用生成器进行计时,严格控制模块的精度time
(与我从堆栈交换中尝试过的几种解决方案不同)。
注意:对于 Python 2.x,将next(g)
以下内容替换为g.next()
。
import time
def do_every(period,f,*args):
def g_tick():
t = time.time()
while True:
t += period
yield max(t - time.time(),0)
g = g_tick()
while True:
time.sleep(next(g))
f(*args)
def hello(s):
print('hello {} ({:.4f})'.format(s,time.time()))
time.sleep(.3)
do_every(1,hello,'foo')
例如,结果为:
hello foo (1421705487.5811)
hello foo (1421705488.5811)
hello foo (1421705489.5809)
hello foo (1421705490.5830)
hello foo (1421705491.5803)
hello foo (1421705492.5808)
hello foo (1421705493.5811)
hello foo (1421705494.5811)
hello foo (1421705495.5810)
hello foo (1421705496.5811)
hello foo (1421705497.5810)
hello foo (1421705498.5810)
hello foo (1421705499.5809)
hello foo (1421705500.5811)
hello foo (1421705501.5811)
hello foo (1421705502.5811)
hello foo (1421705503.5810)
请注意,此示例包括模拟 CPU 在每个周期内执行其他操作 0.3 秒。如果您将其更改为每次随机执行,则不会有任何问题。行中的最大值yield
用于防止sleep
出现负数,以防调用的函数花费的时间超过指定的周期。在这种情况下,它将立即执行并在下次执行时弥补丢失的时间。
解决方案 4:
也许sched 模块可以满足您的需求。
或者,考虑使用Timer 对象。
解决方案 5:
这将在每次调用之间插入 10 秒钟的睡眠时间foo()
,如果调用快速完成,这大约就是您所要求的时间。
import time
while True:
foo()
time.sleep(10)
foo()
在后台线程中调用时执行其他操作
import time
import sys
import threading
def foo():
sys.stdout.write('({}) foo
'.format(time.ctime()))
def foo_target():
while True:
foo()
time.sleep(10)
t = threading.Thread(target=foo_target)
t.daemon = True
t.start()
print('doing other things...')
解决方案 6:
这是一个使用 Thread 类的很好的实现: http: //g-off.net/software/a-python-repeatable-threadingtimer-class
下面的代码稍微有点快速和粗糙:
from threading import Timer
from time import sleep
def hello():
print "hello, world"
t = Timer(3,hello)
t.start()
t = Timer(3, hello)
t.start() # after 3 seconds, "hello, world" will be printed
# timer will wake up ever 3 seconds, while we do something else
while True:
print "do something else"
sleep(10)
解决方案 7:
您可以在不同的线程中执行任务。threading.Timer
将允许您在一段时间后执行一次给定的回调,如果您想执行您的任务,例如,只要回调返回True
(这实际上是glib.timeout_add
提供的,但您可能没有在 Windows 中安装它)或直到您取消它,您可以使用此代码:
import logging, threading, functools
import time
logging.basicConfig(level=logging.NOTSET,
format='%(threadName)s %(message)s')
class PeriodicTimer(object):
def __init__(self, interval, callback):
self.interval = interval
@functools.wraps(callback)
def wrapper(*args, **kwargs):
result = callback(*args, **kwargs)
if result:
self.thread = threading.Timer(self.interval,
self.callback)
self.thread.start()
self.callback = wrapper
def start(self):
self.thread = threading.Timer(self.interval, self.callback)
self.thread.start()
def cancel(self):
self.thread.cancel()
def foo():
logging.info('Doing some work...')
return True
timer = PeriodicTimer(1, foo)
timer.start()
for i in range(2):
time.sleep(2)
logging.info('Doing some other work...')
timer.cancel()
示例输出:
Thread-1 Doing some work...
Thread-2 Doing some work...
MainThread Doing some other work...
Thread-3 Doing some work...
Thread-4 Doing some work...
MainThread Doing some other work...
注意:回调并非每次间隔执行都会执行。间隔是线程在上一次回调完成和下一次回调调用之间等待的时间。
解决方案 8:
这是一个简单的单线程睡眠版本,它会出现漂移,但当它检测到漂移时会尝试自动纠正。
注意:只有满足以下 3 个合理假设,这才会起作用:
该时间段远大于正在执行的函数的执行时间
每次调用执行的函数所花的时间大致相同
通话之间的漂移量小于一秒
-
from datetime import timedelta
from datetime import datetime
def exec_every_n_seconds(n,f):
first_called=datetime.now()
f()
num_calls=1
drift=timedelta()
time_period=timedelta(seconds=n)
while 1:
time.sleep(n-drift.microseconds/1000000.0)
current_time = datetime.now()
f()
num_calls += 1
difference = current_time - first_called
drift = difference - time_period* num_calls
print "drift=",drift