函数调用超时

2024-11-20 08:44:00
admin
原创
6
摘要:问题描述:我正在调用 Python 中的一个函数,我知道它可能会停滞并迫使我重新启动脚本。我该如何调用该函数或将其包装在什么地方,以便如果它花费的时间超过 5 秒,脚本就会取消它并执行其他操作?解决方案 1:如果您在 UNIX 上运行,则可以使用信号包:In [1]: import signal # Reg...

问题描述:

我正在调用 Python 中的一个函数,我知道它可能会停滞并迫使我重新启动脚本。

我该如何调用该函数或将其包装在什么地方,以便如果它花费的时间超过 5 秒,脚本就会取消它并执行其他操作?


解决方案 1:

如果您在 UNIX 上运行,则可以使用信号包:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

调用 10 秒后signal.alarm(10),处理程序被调用。这会引发一个异常,您可以从常规 Python 代码中拦截该异常。

这个模块不能很好地与线程配合(但谁可以呢?)

请注意,由于我们在超时发生时引发异常,它可能最终在函数内部被捕获并忽略,例如这样的一个函数:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue

解决方案 2:

您可以使用它multiprocessing.Process来做到这一点。

代码

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()

        p.join()

解决方案 3:

我该如何调用该函数或将其包装在什么地方,以便如果它花费的时间超过 5 秒,脚本就会取消它?

我发布了一个使用装饰器和 解决此问题/问题的要点threading.Timer。以下是其分解。

导入和设置兼容性

它已使用 Python 2 和 3 进行了测试。它也可以在 Unix/Linux 和 Windows 下运行。

首先是导入。无论 Python 版本如何,这些导入都试图保持代码的一致性:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

使用与版本无关的代码:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

现在我们已经从标准库导入了我们的功能。

exit_after装饰器

接下来我们需要一个函数来终止main()子线程:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

装饰器本身如下:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

用法

以下用法可以直接回答您关于 5 秒后退出的问题:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

演示:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

第二个函数调用将不会完成,相反,该过程应该通过回溯退出!

KeyboardInterrupt并不总是停止休眠线程

请注意,在 Windows 上的 Python 2 中,睡眠并不总是会被键盘中断打断,例如:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

也不太可能中断扩展中运行的代码,除非它明确检查PyErr_CheckSignals(),请参阅 Cython、Python 和 KeyboardInterrupt 被忽略

无论如何,我都会避免让线程休眠超过一秒钟 - 这在处理器时间上需要很长时间。

我该如何调用该函数或将其包装在什么地方,以便如果它花费的时间超过 5 秒,脚本就会取消它并执行其他操作?

为了捕获它并执行其他操作,您可以捕获 KeyboardInterrupt。

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

解决方案 4:

我有一个不同的建议,它是一个纯函数(具有与线程建议相同的 API),并且似乎运行良好(基于此线程上的建议)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

解决方案 5:

我在搜索单元测试的超时调用时遇到了这个帖子。我在答案或第三方包中找不到任何简单的东西,所以我写了下面的装饰器,你可以直接把它放到代码中:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

然后,就可以像这样简单地超时测试或任何您喜欢的功能:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

解决方案 6:

stopit在 pypi 上找到的这个包似乎可以很好地处理超时。

我喜欢装饰器,它为被装饰的函数@stopit.threading_timeoutable添加一个参数,按照你的期望执行,即停止该函数。timeout

在 pypi 上查看:https ://pypi.python.org/pypi/stopit

解决方案 7:

我是wrapt_timeout_decorator的作者。

乍一看,这里介绍的大多数解决方案在 Linux 下运行良好 - 因为我们有fork()-signals()但在 Windows 上情况看起来有点不同。当谈到 Linux 上的子线程时,您不能再使用信号了。

为了在 Windows 下生成一个进程,它需要是可 picklable 的——但许多装饰函数或类方法却不是。

所以您需要使用更好的 pickler,比如 dill 和 multiprocess(而不是 pickle 和 multiprocessing)——这就是为什么您不能使用ProcessPoolExecutor(或只能使用有限的功能)。

对于超时本身 - 您需要定义超时的含义 - 因为在 Windows 上,生成进程需要相当长的时间(并且无法确定)。在短超时的情况下,这可能很棘手。假设生成进程大约需要 0.5 秒(很容易!!!)。如果您给出 0.2 秒的超时,会发生什么?函数是否应在 0.5 + 0.2 秒后超时(因此让该方法运行 0.2 秒)?或者被调用的进程是否应在 0.2 秒后超时(在这种情况下,修饰函数将始终超时,因为在这段时间内它甚至没有生成)?

此外,嵌套装饰器可能很麻烦,而且您不能在子线程中使用信号。如果您想创建一个真正通用的跨平台装饰器,则需要考虑(并测试)所有这些。

其他问题是将异常传递回调用者,以及日志记录问题(如果在修饰函数中使用 - 不支持记录到另一个进程中的文件)

我试图涵盖所有边缘情况,您可以查看包 wrapt_timeout_decorator,或者至少测试受那里使用的单元测试启发的您自己的解决方案。

@Alexis Eggermont - 不幸的是我没有足够的积分来评论 - 也许其他人可以通知您 - 我想我解决了您的导入问题。

解决方案 8:

有很多建议,但没有一个使用concurrent.futures,我认为这是处理此问题最清晰的方法。

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

阅读和维护超级简单。

我们创建一个池,提交一个进程,然后等待最多 5 秒钟,然后引发一个 TimeoutError,您可以根据需要捕获和处理该错误。

原生于 Python 3.2+ 并反向移植到 2.7(pip install futures)。

线程和进程之间的切换很简单,只需ProcessPoolExecutor用替换 即可ThreadPoolExecutor

如果您想在超时时终止进程,我建议您研究Pebble。

解决方案 9:

基于并增强 @piro 的答案,您可以构建一个 contextmanager。这允许非常易读的代码,它将在成功运行后禁用 alamam 信号(设置 signal.alarm(0))

from contextlib import contextmanager
import signal
import time

@contextmanager
def timeout(duration):
    def timeout_handler(signum, frame):
        raise TimeoutError(f'block timedout after {duration} seconds')
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(duration)
    try:
        yield
    finally:
        signal.alarm(0)

def sleeper(duration):
    time.sleep(duration)
    print('finished')

使用示例:

In [19]: with timeout(2):
    ...:     sleeper(1)
    ...:     
finished

In [20]: with timeout(2):
    ...:     sleeper(3)
    ...:         
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
      1 with timeout(2):
----> 2     sleeper(3)
      3 

<ipython-input-7-a75b966bf7ac> in sleeper(t)
      1 def sleeper(t):
----> 2     time.sleep(t)
      3     print('finished')
      4 

<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
      2 def timeout(duration):
      3     def timeout_handler(signum, frame):
----> 4         raise Exception(f'block timedout after {duration} seconds')
      5     signal.signal(signal.SIGALRM, timeout_handler)
      6     signal.alarm(duration)

Exception: block timedout after 2 seconds

解决方案 10:

很棒、易于使用且可靠的PyPi项目timeout-decoratorhttps://pypi.org/project/timeout-decorator/

安装

pip install timeout-decorator

用法

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()

解决方案 11:

timeout-decorator不适用于 Windows 系统,因为 Windows 支持不佳signal

如果你在 Windows 系统中使用 timeout-decorator,你将得到以下内容

AttributeError: module 'signal' has no attribute 'SIGALRM'

有人建议使用use_signals=False但对我来说不起作用。

作者@bitranox 创建了以下包:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

代码示例:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

给出以下异常:

TimeoutError: Function mytest timed out after 5 seconds

解决方案 12:

为了以防万一它对任何人都有帮助,基于@piro 的回答,我制作了一个函数装饰器:

import time
import signal
from functools import wraps


def timeout(timeout_secs: int):
    def wrapper(func):
        @wraps(func)
        def time_limited(*args, **kwargs):
            # Register an handler for the timeout
            def handler(signum, frame):
                raise Exception(f"Timeout for function '{func.__name__}'")

            # Register the signal function handler
            signal.signal(signal.SIGALRM, handler)

            # Define a timeout for your function
            signal.alarm(timeout_secs)

            result = None
            try:
                result = func(*args, **kwargs)
            except Exception as exc:
                raise exc
            finally:
                # disable the signal alarm
                signal.alarm(0)

            return result

        return time_limited

    return wrapper

在具有超时功能的函数上使用包装器20 seconds看起来像这样:

    @timeout(20)
    def my_slow_or_never_ending_function(name):
        while True:
            time.sleep(1)
            print(f"Yet another second passed {name}...")

    try:
        results = my_slow_or_never_ending_function("Yooo!")
    except Exception as e:
        print(f"ERROR: {e}")

解决方案 13:

亮点

  • 引发TimeoutError使用异常来警告超时 - 可以轻松修改

  • 跨平台:Windows 和 Mac OS X

  • 兼容性:Python 3.6+(我也在 Python 2.7 上进行了测试,只需进行少量语法调整即可运行)

有关并行图的完整解释和扩展,请参阅此处https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts

最小示例

>>> @killer_call(timeout=4)
... def bar(x):
...        import time
...        time.sleep(x)
...        return x
>>> bar(10)
Traceback (most recent call last):
  ...
__main__.TimeoutError: function 'bar' timed out after 4s

和预期的一样

>>> bar(2)
2

完整代码

import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill

from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any

class TimeoutError(Exception):

    def __init__(self, func: Callable, timeout: int):
        self.t = timeout
        self.fname = func.__name__

    def __str__(self):
            return f"function '{self.fname}' timed out after {self.t}s"


def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
    """lemmiwinks crawls into the unknown"""
    q.put(dill.loads(func)(*args, **kwargs))


def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
    """
    Single function call with a timeout

    Args:
        func: the function
        timeout: The timeout in seconds
    """

    if not isinstance(timeout, int):
        raise ValueError(f'timeout needs to be an int. Got: {timeout}')

    if func is None:
        return functools.partial(killer_call, timeout=timeout)

    @functools.wraps(killer_call)
    def _inners(*args, **kwargs) -> Any:
        q_worker = mp.Queue()
        proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
        proc.start()
        try:
            return q_worker.get(timeout=timeout)
        except mpq.Empty:
            raise TimeoutError(func, timeout)
        finally:
            try:
                proc.terminate()
            except:
                pass
    return _inners

if __name__ == '__main__':
    @killer_call(timeout=4)
    def bar(x):
        import time
        time.sleep(x)
        return x

    print(bar(2))
    bar(10)

笔记

由于该方式有效,您将需要在函数内部进行导入dill

这也意味着如果目标函数中有导入,这些函数可能不兼容doctest。您将遇到未找到的问题__import__

解决方案 14:

我们可以使用信号来实现相同的效果。我认为下面的示例对您有用。与线程相比,它非常简单。

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

解决方案 15:

Tim Savannah 的func_timeout 包对我来说很好用。

安装:

pip install func_timeout

用法:

import time
from func_timeout import func_timeout, FunctionTimedOut

def my_func(n):
    time.sleep(n)

time_to_sleep = 10

# time out after 2 seconds using kwargs
func_timeout(2, my_func, kwargs={'n' : time_to_sleep})

# time out after 2 seconds using args
func_timeout(2, my_func, args=(time_to_sleep,))

解决方案 16:

使用 asyncio 的另一种解决方案:

如果您想要取消后台任务而不只是在正在运行的主代码上超时,那么您需要从主线程进行明确的通信以要求任务代码取消,例如 threading.Event()

import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor


class SingletonTimeOut:
    pool = None

    @classmethod
    def run(cls, to_run: functools.partial, timeout: float):
        pool = cls.get_pool()
        loop = cls.get_loop()
        try:
            task = loop.run_in_executor(pool, to_run)
            return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
        except asyncio.TimeoutError as e:
            error_type = type(e).__name__ #TODO
            raise e

    @classmethod
    def get_pool(cls):
        if cls.pool is None:
            cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
        return cls.pool

    @classmethod
    def get_loop(cls):
        try:
            return asyncio.get_event_loop()
        except RuntimeError:
            asyncio.set_event_loop(asyncio.new_event_loop())
            # print("NEW LOOP" + str(threading.current_thread().ident))
            return asyncio.get_event_loop()

# ---------------

TIME_OUT = float('0.2')  # seconds

def toto(input_items,nb_predictions):
    return 1

to_run = functools.partial(toto,
                           input_items=1,
                           nb_predictions="a")

results = SingletonTimeOut.run(to_run, TIME_OUT)

解决方案 17:

这是一个简单易用的装饰器,如果函数的执行时间到期,它会返回给定的默认值,这是受到该问题的第一个答案的启发:

import signal
from functools import wraps
import time

def timeout(seconds, default=None):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            def signal_handler(signum, frame):
                raise TimeoutError("Timed out!")
            # Set up the signal handler for timeout
            signal.signal(signal.SIGALRM, signal_handler)

            # Set the initial alarm for the integer part of seconds
            signal.setitimer(signal.ITIMER_REAL, seconds)

            
            try:
                result = func(*args, **kwargs)
            except TimeoutError:
                return default
            finally:
                signal.alarm(0)
            
            return result
        
        return wrapper
    
    return decorator

@timeout(0.2, default="Timeout!")
def long_function_call(meal):
    time.sleep(3)
    return f"I have executed fully, {meal} is ready"

@timeout(1.3, default="Timeout!")
def less_long_function_call(meal):
    time.sleep(1)
    return f"I have executed fully, {meal} is ready"

result = long_function_call("bacon")
print(result)  # Prints "Timeout!" if the function execution exceeds 0.2 seconds
result = less_long_function_call("bacon")
print(result)  # Prints "Timeout!" if the function execution exceeds 1.3 seconds

解决方案 18:

#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

解决方案 19:

我需要可嵌套的定时中断(SIGALARM 无法实现),并且不会被 time.sleep 阻止(基于线程的方法无法实现)。我最终从这里复制并稍微修改了代码: http: //code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

代码本身:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

用法示例:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

解决方案 20:

我也遇到过同样的问题,但我的情况是需要在子线程上工作,信号对我来说不起作用,所以我编写了一个 python 包:timeout-timer 来解决这个问题,支持用作上下文或装饰器,使用信号或子线程模块来触发超时中断:

from timeout_timer import timeout, TimeoutInterrupt

class TimeoutInterruptNested(TimeoutInterrupt):
    pass

def test_timeout_nested_loop_both_timeout(timer="thread"):
    cnt = 0
    try:
        with timeout(5, timer=timer):
            try:
                with timeout(2, timer=timer, exception=TimeoutInterruptNested):
                    sleep(2)
            except TimeoutInterruptNested:
                cnt += 1
            time.sleep(10)
    except TimeoutInterrupt:
        cnt += 1
    assert cnt == 2

更多内容请见:https://github.com/dozysun/timeout-timer

解决方案 21:

这是一个简单的例子,运行一个带有超时的方法,如果成功则检索其值。

import multiprocessing
import time

ret = {"foo": False}


def worker(queue):
    """worker function"""

    ret = queue.get()

    time.sleep(1)

    ret["foo"] = True
    queue.put(ret)


if __name__ == "__main__":
    queue = multiprocessing.Queue()
    queue.put(ret)

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join(timeout=10)

    if p.exitcode is None:
        print("The worker timed out.")
    else:
        print(f"The worker completed and returned: {queue.get()}")

解决方案 22:

这是对给定的基于线程的解决方案的轻微改进。

下面的代码支持异常

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

调用它并设置 5 秒超时:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)

解决方案 23:

这是一个 POSIX 版本,它结合了许多以前的答案,以提供以下功能:

  1. 子进程阻止执行。

  2. 在类成员函数中使用超时函数。

  3. 对终止时间有严格要求。

以下是代码和一些测试用例:

import threading
import signal
import os
import time

class TerminateExecution(Exception):
    """
    Exception to indicate that execution has exceeded the preset running time.
    """


def quit_function(pid):
    # Killing all subprocesses
    os.setpgrp()
    os.killpg(0, signal.SIGTERM)

    # Killing the main thread
    os.kill(pid, signal.SIGTERM)


def handle_term(signum, frame):
    raise TerminateExecution()


def invoke_with_timeout(timeout, fn, *args, **kwargs):
    # Setting a sigterm handler and initiating a timer
    old_handler = signal.signal(signal.SIGTERM, handle_term)
    timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
    terminate = False

    # Executing the function
    timer.start()
    try:
        result = fn(*args, **kwargs)
    except TerminateExecution:
        terminate = True
    finally:
        # Restoring original handler and cancel timer
        signal.signal(signal.SIGTERM, old_handler)
        timer.cancel()

    if terminate:
        raise BaseException("xxx")

    return result

### Test cases
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        time.sleep(1)
    print('countdown finished')
    return 1337


def really_long_function():
    time.sleep(10)


def really_long_function2():
    os.system("sleep 787")


# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337

# Testing various scenarios
t1 = time.time()
try:
    print(invoke_with_timeout(1, countdown, 3))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function2))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)


t1 = time.time()
try:
    print(invoke_with_timeout(1, really_long_function))
    assert(False)
except BaseException:
    assert(time.time() - t1 < 1.1)
    print("All good", time.time() - t1)

# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)


class X:
    def __init__(self):
        self.value = 0

    def set(self, v):
        self.value = v


x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9

解决方案 24:

如果工作未完成,我打算终止该进程,使用线程和进程来实现这一点。

from concurrent.futures import ThreadPoolExecutor

from time import sleep
import multiprocessing


# test case 1
def worker_1(a,b,c):
    for _ in range(2):
        print('very time consuming sleep')
        sleep(1)

    return a+b+c

# test case 2
def worker_2(in_name):
    for _ in range(10):
        print('very time consuming sleep')
        sleep(1)

    return 'hello '+in_name

实际类作为上下文管理器

class FuncTimer():
    def __init__(self,fn,args,runtime):
        self.fn = fn
        self.args = args
        self.queue = multiprocessing.Queue()
        self.runtime = runtime
        self.process = multiprocessing.Process(target=self.thread_caller)

    def thread_caller(self):
        with ThreadPoolExecutor() as executor:
            future = executor.submit(self.fn, *self.args)
            self.queue.put(future.result())

    def  __enter__(self):
        return self

    def start_run(self):
        self.process.start()
        self.process.join(timeout=self.runtime)
        if self.process.exitcode is None:
            self.process.kill()
        if self.process.exitcode is None:
            out_res = None
            print('killed premature')
        else:
            out_res = self.queue.get()
        return out_res


    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.process.kill()

如何使用

print('testing case 1') 
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp: 
    res = fp.start_run()
    print(res)

print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp: 
    res = fp.start_run()
    print(res)
相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   601  
  华为IPD与传统研发模式的8大差异在快速变化的商业环境中,产品研发模式的选择直接决定了企业的市场响应速度和竞争力。华为作为全球领先的通信技术解决方案供应商,其成功在很大程度上得益于对产品研发模式的持续创新。华为引入并深度定制的集成产品开发(IPD)体系,相较于传统的研发模式,展现出了显著的差异和优势。本文将详细探讨华为...
IPD流程是谁发明的   7  
  如何通过IPD流程缩短产品上市时间?在快速变化的市场环境中,产品上市时间成为企业竞争力的关键因素之一。集成产品开发(IPD, Integrated Product Development)作为一种先进的产品研发管理方法,通过其结构化的流程设计和跨部门协作机制,显著缩短了产品上市时间,提高了市场响应速度。本文将深入探讨如...
华为IPD流程   9  
  在项目管理领域,IPD(Integrated Product Development,集成产品开发)流程图是连接创意、设计与市场成功的桥梁。它不仅是一个视觉工具,更是一种战略思维方式的体现,帮助团队高效协同,确保产品按时、按质、按量推向市场。尽管IPD流程图可能初看之下显得错综复杂,但只需掌握几个关键点,你便能轻松驾驭...
IPD开发流程管理   8  
  在项目管理领域,集成产品开发(IPD)流程被视为提升产品上市速度、增强团队协作与创新能力的重要工具。然而,尽管IPD流程拥有诸多优势,其实施过程中仍可能遭遇多种挑战,导致项目失败。本文旨在深入探讨八个常见的IPD流程失败原因,并提出相应的解决方法,以帮助项目管理者规避风险,确保项目成功。缺乏明确的项目目标与战略对齐IP...
IPD流程图   8  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用