如何限制函数调用的执行时间?[重复]

2024-12-23 08:43:00
admin
原创
103
摘要:问题描述:我的代码中有一个与套接字相关的函数调用,该函数来自另一个模块,因此不受我的控制,问题是它偶尔会阻塞数小时,这是完全不可接受的,我该如何限制代码中的函数执行时间?我猜解决方案必须利用另一个线程。解决方案 1:@rik.the.vik 的答案的一个改进是使用with语句为超时函数提供一些语法糖:impo...

问题描述:

我的代码中有一个与套接字相关的函数调用,该函数来自另一个模块,因此不受我的控制,问题是它偶尔会阻塞数小时,这是完全不可接受的,我该如何限制代码中的函数执行时间?我猜解决方案必须利用另一个线程。


解决方案 1:

@rik.the.vik 的答案的一个改进是使用with语句为超时函数提供一些语法糖:

import signal
from contextlib import contextmanager

class TimeoutException(Exception): pass

@contextmanager
def time_limit(seconds):
    def signal_handler(signum, frame):
        raise TimeoutException("Timed out!")
    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)


try:
    with time_limit(10):
        long_function_call()
except TimeoutException as e:
    print("Timed out!")

解决方案 2:

我不确定这有多跨平台,但使用信号和警报可能是看待这个问题的好方法。只需一点工作,您就可以使其完全通用,并可在任何情况下使用。

http://docs.python.org/library/signal.html

所以你的代码看起来会像这样。

import signal

def signal_handler(signum, frame):
    raise Exception("Timed out!")

signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(10)   # Ten seconds
try:
    long_function_call()
except Exception, msg:
    print "Timed out!"

解决方案 3:

这是在 Linux/OSX 中限制函数运行时间的方法。如果您不想使用线程,并希望程序等到函数结束或时间限制到期,可以使用这种方法。

from multiprocessing import Process
from time import sleep

def f(time):
    sleep(time)


def run_with_limited_time(func, args, kwargs, time):
    """Runs a function with time limit

    :param func: The function to run
    :param args: The functions args, given as tuple
    :param kwargs: The functions keywords, given as dict
    :param time: The time limit in seconds
    :return: True if the function ended successfully. False if it was terminated.
    """
    p = Process(target=func, args=args, kwargs=kwargs)
    p.start()
    p.join(time)
    if p.is_alive():
        p.terminate()
        return False

    return True


if __name__ == '__main__':
    print run_with_limited_time(f, (1.5, ), {}, 2.5) # True
    print run_with_limited_time(f, (3.5, ), {}, 2.5) # False

解决方案 4:

我更喜欢上下文管理器方法,因为它允许在一个语句中执行多个 python 语句with time_limit。由于 windows 系统没有SIGALARM,因此更便携且可能更直接的方法可能是使用Timer

from contextlib import contextmanager
import threading
import _thread

class TimeoutException(Exception):
    def __init__(self, msg=''):
        self.msg = msg

@contextmanager
def time_limit(seconds, msg=''):
    timer = threading.Timer(seconds, lambda: _thread.interrupt_main())
    timer.start()
    try:
        yield
    except KeyboardInterrupt:
        raise TimeoutException("Timed out for operation {}".format(msg))
    finally:
        # if the action ends in specified time, timer is canceled
        timer.cancel()

import time
# ends after 5 seconds
with time_limit(5, 'sleep'):
    for i in range(10):
        time.sleep(1)

# this will actually end after 10 seconds
with time_limit(5, 'sleep'):
    time.sleep(10)

这里的关键技术是使用 来_thread.interrupt_main从定时器线程中断主线程。需要注意的是,主线程并不总是能够快速响应 引发的KeyboardInterruptTimer例如,time.sleep()调用系统函数,因此KeyboardInterrupt将在调用后处理sleep

解决方案 5:

这里:获得所需效果的简单方法:

https://pypi.org/project/func-timeout

这救了我的命。

现在举一个它如何工作的例子:假设您有一个很大的项目列表需要处理,并且您正在迭代这些项目。但是,由于某种奇怪的原因,您的函数卡在了项目 n 上,没有引发异常。您需要处理其他项目,越多越好。在这种情况下,您可以为处理每个项目设置超时:

import time
import func_timeout


def my_function(n):
    """Sleep for n seconds and return n squared."""
    print(f'Processing {n}')
    time.sleep(n)
    return n**2


def main_controller(max_wait_time, all_data):
    """
    Feed my_function with a list of items to process (all_data).

    However, if max_wait_time is exceeded, return the item and a fail info.
    """
    res = []
    for data in all_data:
        try:
            my_square = func_timeout.func_timeout(
                max_wait_time, my_function, args=[data]
                )
            res.append((my_square, 'processed'))
        except func_timeout.FunctionTimedOut:
            print('error')
            res.append((data, 'fail'))
            continue

    return res


timeout_time = 2.1  # my time limit
all_data = range(1, 10)  # the data to be processed

res = main_controller(timeout_time, all_data)
print(res)

解决方案 6:

在信号处理程序中执行此操作很危险:在引发异常时,您可能位于异常处理程序内部,并使程序处于损坏状态。例如,

def function_with_enforced_timeout():
  f = open_temporary_file()
  try:
   ...
  finally:
   here()
   unlink(f.filename)

如果在此处引发异常(),则临时文件将永远不会被删除。

这里的解决方案是将异步异常推迟到代码不在异常处理代码(except 或 finally 块)内,但 Python 不会这样做。

请注意,这不会在执行本机代码时中断任何事情;它只会在函数返回时中断它,因此这可能对这种特殊情况没有帮助。(SIGALRM 本身可能会中断阻塞的调用 - 但套接字代码通常只是在 EINTR 之后重试。)

使用线程执行此操作是一个更好的主意,因为它比信号更易于移植。由于您启动了一个工作线程并阻塞直到它完成,因此不存在常见的并发问题。不幸的是,在 Python 中无法将异常异步传递给另一个线程(其他线程 API 可以做到这一点)。在异常处理程序期间发送异常也会有同样的问题,并且需要同样的修复。

解决方案 7:

在任何语言中,唯一“安全”的方法是使用辅助进程来执行超时操作,否则您需要以某种方式构建代码,使其能够自行安全超时,例如通过检查循环或类似方法中经过的时间。如果无法更改方法,则线程是不够的。

为什么?因为这样做会冒着让事情处于糟糕状态的风险。如果线程在方法执行过程中被简单地终止,则持有的锁等将只会被持有,无法释放。

所以要看进程方式,不要线程方式。

解决方案 8:

我通常更喜欢使用 @josh-lee 建议的 contextmanager

但如果有人有兴趣将其作为装饰器实现,这里有一个替代方案。

它看起来是这样的:

import time
from timeout import timeout

class Test(object):
    @timeout(2)
    def test_a(self, foo, bar):
        print foo
        time.sleep(1)
        print bar
        return 'A Done'

    @timeout(2)
    def test_b(self, foo, bar):
        print foo
        time.sleep(3)
        print bar
        return 'B Done'

t = Test()
print t.test_a('python', 'rocks')
print t.test_b('timing', 'out')

这是timeout.py模块:

import threading

class TimeoutError(Exception):
    pass

class InterruptableThread(threading.Thread):
    def __init__(self, func, *args, **kwargs):
        threading.Thread.__init__(self)
        self._func = func
        self._args = args
        self._kwargs = kwargs
        self._result = None

    def run(self):
        self._result = self._func(*self._args, **self._kwargs)

    @property
    def result(self):
        return self._result


class timeout(object):
    def __init__(self, sec):
        self._sec = sec

    def __call__(self, f):
        def wrapped_f(*args, **kwargs):
            it = InterruptableThread(f, *args, **kwargs)
            it.start()
            it.join(self._sec)
            if not it.is_alive():
                return it.result
            raise TimeoutError('execution expired')
        return wrapped_f

输出:

python
rocks
A Done
timing
Traceback (most recent call last):
  ...
timeout.TimeoutError: execution expired
out

请注意,即使TimeoutError抛出了,修饰方法仍会继续在另一个线程中运行。如果您还想“停止”此线程,请参阅:有没有办法在 Python 中终止线程?

解决方案 9:

您不必使用线程。您可以使用另一个进程来完成阻塞工作,例如,可以使用子进程模块。如果您想在程序的不同部分之间共享数据结构,那么Twisted是一个很好的库,可以让您控制这一点,如果您关心阻塞并且预计会经常遇到这种麻烦,我建议您使用它。Twisted 的坏消息是您必须重写代码以避免任何阻塞,并且有一个相当的学习曲线。

可以使用线程来避免阻塞,但我认为这是最后的手段,因为它会让您陷入痛苦之中。在考虑在生产中使用线程之前,请先阅读一本关于并发的好书,例如 Jean Bacon 的“并发系统”。我与一群使用线程做非常酷的高性能工作的人一起工作,除非我们真的需要,否则我们不会将线程引入项目。

解决方案 10:

这是一个超时函数,我想我是通过谷歌找到的,它对我很有用。

来自:
http: //code.activestate.com/recipes/473878/

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    '''This function will spwan a thread and run the given function using the args, kwargs and 
    return the given default value if the timeout_duration is exceeded 
    ''' 
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            try:
                self.result = func(*args, **kwargs)
            except:
                self.result = default
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return it.result
    else:
        return it.result   

解决方案 11:

@user2283347 的方法已经过测试,但我们希望删除回溯消息。使用Ctrl-C 上的 Remove traceback in Python 中的 pass 技巧,修改后的代码为:

from contextlib import contextmanager
import threading
import _thread

class TimeoutException(Exception): pass

@contextmanager
def time_limit(seconds):
    timer = threading.Timer(seconds, lambda: _thread.interrupt_main())
    timer.start()
    try:
        yield
    except KeyboardInterrupt:
        pass     
    finally:
        # if the action ends in specified time, timer is canceled
        timer.cancel()

def timeout_svm_score(i):
     #from sklearn import svm
     #import numpy as np
     #from IPython.core.display import display
     #%store -r names X Y
     clf = svm.SVC(kernel='linear', C=1).fit(np.nan_to_num(X[[names[i]]]), Y)
     score = clf.score(np.nan_to_num(X[[names[i]]]),Y)
     #scoressvm.append((score, names[i]))
     display((score, names[i])) 
     
%%time
with time_limit(5):
    i=0
    timeout_svm_score(i)
#Wall time: 14.2 s

%%time
with time_limit(20):
    i=0
    timeout_svm_score(i)
#(0.04541284403669725, '计划飞行时间')
#Wall time: 16.1 s

%%time
with time_limit(5):
    i=14
    timeout_svm_score(i)
#Wall time: 5h 43min 41s

我们可以看出,这种方法可能需要很长时间才能中断计算,我们要求5秒,但它在5个小时后才完成。

解决方案 12:

使用简单的装饰器

这是我研究了以上答案后制作的版本。非常简单。

def function_timeout(seconds: int):
    """Wrapper of Decorator to pass arguments"""

    def decorator(func):
        @contextmanager
        def time_limit(seconds_):
            def signal_handler(signum, frame):  # noqa
                raise TimeoutException(f"Timed out in {seconds_} seconds!")

            signal.signal(signal.SIGALRM, signal_handler)
            signal.alarm(seconds_)
            try:
                yield
            finally:
                signal.alarm(0)

        @wraps(func)
        def wrapper(*args, **kwargs):
            with time_limit(seconds):
                return func(*args, **kwargs)

        return wrapper

    return decorator

如何使用?

@function_timeout(seconds=5)
def my_naughty_function():
    while True:
        print("Try to stop me ;-p")

当然,如果它在单独的文件中,请不要忘记导入该函数。

解决方案 13:

该代码适用于带有 python 3.7.3 的 Windows Server Datacenter 2016,我没有在 Unix 上测试过,在混合了 Google 和 StackOverflow 的一些答案之后,它最终对我起作用了,如下所示:

from multiprocessing import Process, Lock
import time
import os

def f(lock,id,sleepTime):
    lock.acquire()
    print("I'm P"+str(id)+" Process ID: "+str(os.getpid()))
    lock.release()
    time.sleep(sleepTime)   #sleeps for some time
    print("Process: "+str(id)+" took this much time:"+str(sleepTime))
    time.sleep(sleepTime)
    print("Process: "+str(id)+" took this much time:"+str(sleepTime*2))

if __name__ == '__main__':
    timeout_function=float(9) # 9 seconds for max function time
    print("Main Process ID: "+str(os.getpid()))
    lock=Lock()
    p1=Process(target=f, args=(lock,1,6,))   #Here you can change from 6 to 3 for instance, so you can watch the behavior
    start=time.time()
    print(type(start))
    p1.start()
    if p1.is_alive():
        print("process running a")
    else:
        print("process not running a")
    while p1.is_alive():
        timeout=time.time()
        if timeout-start > timeout_function:
            p1.terminate()
            print("process terminated")
        print("watching, time passed: "+str(timeout-start) )
        time.sleep(1)
    if p1.is_alive():
        print("process running b")
    else:
        print("process not running b")
    p1.join()
    if p1.is_alive():
        print("process running c")
    else:
        print("process not running c")
    end=time.time()
    print("I am the main process, the two processes are done")
    print("Time taken:- "+str(end-start)+" secs")   #MainProcess terminates at approx ~ 5 secs.
    time.sleep(5) # To see if on Task Manager the child process is really being terminated, and it is
    print("finishing")

主要代码来自此链接:
使用 python(windows) 创建两个子进程

然后我用来.terminate()终止子进程。您可以看到函数 f 调用了 2 次打印,一次是在 5 秒后,另一次是在 10 秒后。但是,在 7 秒的睡眠和终止() 之后,它不会显示最后一次打印。

它对我有用,希望它有帮助!

相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1120  
  IPD(Integrated Product Development,集成产品开发)流程是一种广泛应用于高科技和制造业的产品开发方法论。它通过跨职能团队的紧密协作,将产品开发周期缩短,同时提高产品质量和市场成功率。在IPD流程中,CDCP(Concept Decision Checkpoint,概念决策检查点)是一个关...
IPD培训课程   75  
  研发IPD(集成产品开发)流程作为一种系统化的产品开发方法,已经在许多行业中得到广泛应用。它不仅能够提升产品开发的效率和质量,还能够通过优化流程和资源分配,显著提高客户满意度。客户满意度是企业长期成功的关键因素之一,而IPD流程通过其独特的结构和机制,能够确保产品从概念到市场交付的每个环节都围绕客户需求展开。本文将深入...
IPD流程   66  
  IPD(Integrated Product Development,集成产品开发)流程是一种以跨职能团队协作为核心的产品开发方法,旨在通过优化资源分配、提高沟通效率以及减少返工,从而缩短项目周期并提升产品质量。随着企业对产品上市速度的要求越来越高,IPD流程的应用价值愈发凸显。通过整合产品开发过程中的各个环节,IPD...
IPD项目管理咨询   76  
  跨部门沟通是企业运营中不可或缺的一环,尤其在复杂的产品开发过程中,不同部门之间的协作效率直接影响项目的成败。集成产品开发(IPD)作为一种系统化的项目管理方法,旨在通过优化流程和增强团队协作来提升产品开发的效率和质量。然而,跨部门沟通的复杂性往往成为IPD实施中的一大挑战。部门之间的目标差异、信息不对称以及沟通渠道不畅...
IPD是什么意思   70  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用