如何每 x 秒重复执行一个函数?

2024-11-21 08:33:00
admin
原创
4
摘要:问题描述:我想永久地每 60 秒重复执行一次 Python 函数(就像Objective C 中的NSTimer或 JS 中的 setTimeout 一样)。此代码将作为守护进程运行,实际上就像使用 cron 每分钟调用一次 Python 脚本一样,但不需要用户进行设置。在这个关于用 Python 实现的 c...

问题描述:

我想永久地每 60 秒重复执行一次 Python 函数(就像Objective C 中的NSTimer或 JS 中的 setTimeout 一样)。此代码将作为守护进程运行,实际上就像使用 cron 每分钟调用一次 Python 脚本一样,但不需要用户进行设置。

在这个关于用 Python 实现的 cron 的问题中,解决方案似乎实际上只是sleep() x 秒。我不需要如此高级的功能,所以也许像这样的方法可以工作

while True:
    # Code executed here
    time.sleep(60)

该代码是否存在可预见的问题?


解决方案 1:

如果您的程序还没有事件循环,请使用sched模块,它实现了通用事件调度程序。

import sched, time

def do_something(scheduler): 
    # schedule the next call first
    scheduler.enter(60, 1, do_something, (scheduler,))
    print("Doing stuff...")
    # then do your stuff

my_scheduler = sched.scheduler(time.time, time.sleep)
my_scheduler.enter(60, 1, do_something, (my_scheduler,))
my_scheduler.run()

如果您已经在使用事件循环库(如、、、、、、asyncio等) - 只需使用现有事件循环库的方法来安排任务。trio`tkinterPyQt5gobject`kivy

解决方案 2:

将时间循环锁定到系统时钟,如下所示:

import time
starttime = time.monotonic()
while True:
    print("tick")
    time.sleep(60.0 - ((time.monotonic() - starttime) % 60.0))

使用“单调”时钟正常工作;time()通过太阳/法定时间变化、ntp 同步等进行调整......

解决方案 3:

如果您想要一种非阻塞方式定期执行函数,而不是阻塞无限循环,我会使用线程计时器。这样您的代码就可以继续运行并执行其他任务,并且仍然每 n 秒调用一次函数。我经常使用这种技术来打印耗时长、占用大量 CPU/磁盘/网络的任务的进度信息。

这是我在类似问题中发布的代码,其中包含 start() 和 stop() 控制:

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

用法:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

特征:

  • 仅限标准库,无外部依赖

  • start()`stop()`即使计时器已启动/停止,也可以安全地多次调用

  • 被调用的函数可以有位置参数和命名参数

  • 您可以随时更改interval,它将在下次运行后生效。argskwargs也一样function

解决方案 4:

您可能需要考虑Twisted ,它是一个实现Reactor 模式的 Python 网络库。

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

虽然“while True: sleep(60)”可能会起作用,但 Twisted 可能已经实现了许多你最终需要的功能(bobince 指出的守护进程、日志记录或异常处理),并且可能是一个更强大的解决方案

解决方案 5:

这是 MestreLion 代码的更新,可以避免随着时间的推移而出现漂移。

此处的 RepeatedTimer 类按照 OP 的要求每“间隔”秒调用一次给定函数;计划不取决于函数执行所需的时间。我喜欢这个解决方案,因为它没有外部库依赖项;这只是纯 Python。

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

示例用法(从 MestreLion 的答案复制而来):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

解决方案 6:

import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

如果您想在不阻止剩余代码的情况下执行此操作,您可以使用它让它在自己的线程中运行:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

该解决方案结合了其他解决方案中很少见的几个功能:

  • 异常处理:在这个级别上,尽可能地正确处理异常,即在不中止程序的情况下记录异常以用于调试目的。

  • 无链接:您在许多答案中发现的常见链式实现(用于安排下一个事件)在以下方面很脆弱:如果调度机制(threading.Timer或其他任何机制)出现任何问题,这将终止链。即使问题的原因已经解决,也不会发生进一步的执行。相比之下,简单的循环和简单的等待sleep()要稳健得多。

  • 无偏差:我的解决方案精确跟踪其应运行的时间。不会因执行时间而出现偏差(许多其他解决方案也是如此)。

  • 跳过:如果一次执行花费的时间太长(例如,每五秒执行一次 X,但 X 花费了 6 秒),我的解决方案将跳过任务。这是标准的 cron 行为(并且有充分的理由)。许多其他解决方案则只是连续多次执行任务而没有任何延迟。在大多数情况下(例如清理任务),这并不是我们所希望的。如果希望,只需使用即可next_time += delay

解决方案 7:

我认为更简单的方法是:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

这样,您的代码就会执行,然后等待 60 秒,然后再次执行,等待,执行等...无需使事情复杂化:D

解决方案 8:

我最终使用了计划模块。API 很好。

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

解决方案 9:

替代的灵活性解决方案是Apscheduler。

pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
  pass

sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds

sched.start()

此外,apscheduler 还提供如下许多调度程序。

  • BlockingScheduler:当调度程序是进程中唯一运行的程序时使用

  • BackgroundScheduler:当你不使用下面任何框架时使用,并且希望调度程序在应用程序的后台运行

  • AsyncIOScheduler:如果你的应用程序使用 asyncio 模块,请使用

  • GeventScheduler:如果你的应用程序使用 gevent,请使用

  • TornadoScheduler:如果你正在构建 Tornado 应用程序,请使用

  • TwistedScheduler:如果你正在构建 Twisted 应用程序,请使用

  • QtScheduler:如果你正在构建 Qt 应用程序,请使用

解决方案 10:

如果不考虑漂移

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)
    
thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

异步输出。

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

如果运行的任务需要相当长的时间,那么间隔将变成 2 秒 + 任务时间,因此如果您需要精确的调度,那么这不适合您。

请注意,该daemon=True标志表示此线程不会阻止应用程序关闭。例如,pytest在运行测试后等待此线程停止时,会出现无限期挂起的问题。

解决方案 11:

我以前也遇到过类似的问题。http: //cronus.readthedocs.org可能有帮助吗?

对于 v0.2,以下代码片段有效

import cronus.beat as beat

beat.set_rate(2) # run twice per second
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec

解决方案 12:

它和 cron 的主要区别在于,异常将彻底终止守护进程。您可能需要使用异常捕获器和记录器进行包装。

解决方案 13:

只需使用

import time

while True:
    print("this will run after every 30 sec")
    #Your code here
    time.sleep(30)

解决方案 14:

一个可能的答案是:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()

解决方案 15:

我使用 Tkinter after() 方法,它不会“窃取游戏”(就像之前介绍的sched模块一样),即它允许其他事情并行运行:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1()并且do_something2()可以并行运行,并且以任意间隔速度运行。在这里,第二个函数的执行速度是原来的两倍。还请注意,我使用了一个简单的计数器作为终止任一函数的条件。您可以使用任何其他您喜欢的条件,或者如果您希望函数运行到程序终止(例如时钟),则可以不使用任何条件。

解决方案 16:

这是 MestreLion 代码的改编版本。除了原始函数外,此代码还:

1)添加 first_interval 用于在特定时间触发计时器(调用者需要计算 first_interval 并传入)

2)解决原代码中的竞争条件。在原代码中,如果控制线程无法取消正在运行的计时器(“停止计时器,并取消执行计时器的操作。这仅在计时器仍处于等待阶段时才有效。”引自https://docs.python.org/2/library/threading.html),计时器将无休止地运行。

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False

解决方案 17:

这是另一种无需使用任何额外库的解决方案。

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()

解决方案 18:

我使用它来每小时引发 60 个事件,其中大多数事件发生在整分钟后的相同秒数:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

根据实际情况,您可能会获得长度的刻度:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

但在 60 分钟后,您将有 60 个刻度;并且大多数刻度都会以您喜欢的分钟的正确偏移量出现。

在我的系统中,我得到的典型漂移是<1/20秒,直到需要校正。

该方法的优点是解决了时钟漂移问题;如果您执行的操作(例如每刻添加一个项目,并且您期望每小时添加 60 个项目)可能会导致问题。不考虑漂移可能会导致移动平均线等次要指标考虑太久远的过去数据,从而导致输出错误。

解决方案 19:

例如,显示当前本地时间

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)

解决方案 20:

timed-count可以实现高精度(即 < 1 毫秒),因为它与系统时钟同步。它不会随时间漂移,也不会受到代码执行时间长度的影响(当然,前提是该时间小于间隔周期)。

一个简单的阻塞示例:

from timed_count import timed_count

for count in timed_count(60):
    # Execute code here exactly every 60 seconds
    ...

您可以通过在线程中运行它轻松地使其非阻塞:

from threading import Thread
from timed_count import timed_count

def periodic():
    for count in timed_count(60):
        # Execute code here exactly every 60 seconds
        ...

thread = Thread(target=periodic)
thread.start()

解决方案 21:

    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()

解决方案 22:

我认为这取决于你想做什么,而且你的问题没有具体说明很多细节。

对我来说,我想在我已经有多线程的进程中执行一项昂贵的操作。因此,我让该领导进程检查时间,并且只有她执行昂贵的操作(检查深度学习模型)。为此,我增加了计数器以确保每 5 秒保存一次,然后是 10 秒,然后是 15 秒(或使用 math.floor 的模运算):

def print_every_5_seconds_have_passed_exit_eventually():
    """
    https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
    https://stackoverflow.com/questions/474528/what-is-the-best-way-to-repeatedly-execute-a-function-every-x-seconds
    :return:
    """
    opts = argparse.Namespace(start=time.time())
    next_time_to_print = 0
    while True:
        current_time_passed = time.time() - opts.start
        if current_time_passed >= next_time_to_print:
            next_time_to_print += 5
            print(f'worked and {current_time_passed=}')
            print(f'{current_time_passed % 5=}')
            print(f'{math.floor(current_time_passed % 5) == 0}')
starting __main__ at __init__
worked and current_time_passed=0.0001709461212158203
current_time_passed % 5=0.0001709461212158203
True
worked and current_time_passed=5.0
current_time_passed % 5=0.0
True
worked and current_time_passed=10.0
current_time_passed % 5=0.0
True
worked and current_time_passed=15.0
current_time_passed % 5=0.0
True

对我来说,检查 if 语句就是我所需要的。如果可以避免的话,在我已经很复杂的多处理多 GPU 代码中添加线程和调度程序并不是我想要增加的复杂性,而且看起来我可以避免。检查工作器 ID 很容易确保只有 1 个进程在执行此操作。

注意,我使用了 True 打印语句来确保模块化算法技巧有效,因为检查确切时间显然行不通!但令我惊喜的是,地板确实奏效了。

解决方案 23:

我一直在寻找一个在单独线程上运行的周期性计时器,这样对象本身仍然可以执行一些工作,例如绘制当前状态。我还希望能够改变周期的速度,比如说,在计划开始时,我希望每 3 秒执行一次工作,几个小时后,我希望将其更改为每 1 小时执行一次等。我一直在寻找 asyncio 和其他库,但效果并不理想。我正在使用 Spyder IDE,尤其是 asyncio 有其自身的局限性(例如使用 nested_asyncio)。我还想随时终止进程,所以这是我的解决方案,它完全可以满足我的所有需求。希望它能对某人有所帮助:

import threading
import time
from datetime import datetime, timedelta


class MultiThreading:
    def __init__(self):
        self.thread = None
        self.started = False
        self.time = 5
        self.n = 0

    def task(self):
        self.n += 1
        print("Job task do your job {:d}".format(self.n))

    def threaded_program(self):
        while self.started == True:
            time.sleep(1)
            if datetime.now() >= self.datetime:
                self.task()
                self.datetime += timedelta(0, self.time)
            else:
                pass

    def run(self):
        self.datetime = datetime.now()
        self.datetime += timedelta(0, self.time)

        self.started = True
        self.task()
        self.thread = threading.Thread(target=self.threaded_program, args=())
        self.thread.start()

    def test(self):
        print("test")

    def stop(self):
        print("stopping")
        self.started = False
        self.thread.join()

使用方法很简单:

mt = MultiThreading()
mt.run()
mt.test()

因为睡眠时间设置为1秒,所以它可以每秒取消一次线程。

mt.stop()

其目的是为了在实验室中进行自动测量,其中需要不时操作由不同类控制的设备。这些周期随着实验的进展而变化。

相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   601  
  华为IPD与传统研发模式的8大差异在快速变化的商业环境中,产品研发模式的选择直接决定了企业的市场响应速度和竞争力。华为作为全球领先的通信技术解决方案供应商,其成功在很大程度上得益于对产品研发模式的持续创新。华为引入并深度定制的集成产品开发(IPD)体系,相较于传统的研发模式,展现出了显著的差异和优势。本文将详细探讨华为...
IPD流程是谁发明的   7  
  如何通过IPD流程缩短产品上市时间?在快速变化的市场环境中,产品上市时间成为企业竞争力的关键因素之一。集成产品开发(IPD, Integrated Product Development)作为一种先进的产品研发管理方法,通过其结构化的流程设计和跨部门协作机制,显著缩短了产品上市时间,提高了市场响应速度。本文将深入探讨如...
华为IPD流程   9  
  在项目管理领域,IPD(Integrated Product Development,集成产品开发)流程图是连接创意、设计与市场成功的桥梁。它不仅是一个视觉工具,更是一种战略思维方式的体现,帮助团队高效协同,确保产品按时、按质、按量推向市场。尽管IPD流程图可能初看之下显得错综复杂,但只需掌握几个关键点,你便能轻松驾驭...
IPD开发流程管理   8  
  在项目管理领域,集成产品开发(IPD)流程被视为提升产品上市速度、增强团队协作与创新能力的重要工具。然而,尽管IPD流程拥有诸多优势,其实施过程中仍可能遭遇多种挑战,导致项目失败。本文旨在深入探讨八个常见的IPD流程失败原因,并提出相应的解决方法,以帮助项目管理者规避风险,确保项目成功。缺乏明确的项目目标与战略对齐IP...
IPD流程图   8  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用