使用 Python 的多处理池进行键盘中断

2025-01-20 09:07:00
admin
原创
72
摘要:问题描述:如何使用 python 的多处理池处理 KeyboardInterrupt 事件?这是一个简单的例子:from multiprocessing import Pool from time import sleep from sys import exit def slowly_square(i):...

问题描述:

如何使用 python 的多处理池处理 KeyboardInterrupt 事件?这是一个简单的例子:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "
Finally, here are the results: ", results

if __name__ == "__main__":
    go()

运行上述代码时,KeyboardInterrupt我按下 时会引发^C,但该过程只是挂在这一点,我必须从外部将其终止。

我希望能够^C随时按下并让所有进程正常退出。


解决方案 1:

这是一个 Python 错误。在 threading.Condition.wait() 中等待条件时,键盘中断永远不会被发送。重现:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

KeyboardInterrupt 异常直到 wait() 返回时才会被传递,并且它永远不会返回,因此中断永远不会发生。KeyboardInterrupt 几乎肯定会中断条件等待。

请注意,如果指定了超时,则不会发生这种情况;cond.wait(1) 将立即收到中断。因此,一种解决方法是指定超时。为此,请替换

    results = pool.map(slowly_square, range(40))

    results = pool.map_async(slowly_square, range(40)).get(9999999)

或类似。

解决方案 2:

从我最近发现的情况来看,最好的解决方案是将工作进程设置为完全忽略 SIGINT,并将所有清理代码限制在父进程中。这解决了空闲和繁忙工作进程的问题,并且不需要在子进程中编写错误处理代码。

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

解释和完整的示例代码可分别在http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/http://github.com/jreese/multiprocessing-keyboardinterrupt找到。

解决方案 3:

由于某些原因,只有从基Exception类继承的异常才会被正常处理。作为一种解决方法,你可以重新引发你的KeyboardInterrupt实例Exception

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

正常情况下你会得到以下输出:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

因此如果你点击^C,你将获得:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

解决方案 4:

投票的答案并未解决核心问题,却产生了类似的副作用。

多处理库的作者 Jesse Nollermultiprocessing.Pool在一篇旧博客文章中解释了如何在使用时正确处理 CTRL+C 。

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

解决方案 5:

其中许多答案都很旧,并且/或者如果您正在执行诸如 之类的方法Pool.map,它们似乎不适用于Windows 上更高版本的 Python(我正在运行 3.8.5) ,该方法会阻塞直到所有提交的任务都完成。以下是我的解决方案。

  1. signal.signal(signal.SIGINT, signal.SIG_IGN)在主进程中发出调用以完全忽略 Ctrl-C。

  2. 处理池将使用池初始化程序进行初始化,该程序将按如下方式初始化每个处理器:全局变量ctrl_c_entered将设置为,并将发出False调用以最初忽略 Ctrl-C。此调用的返回值将被保存;这是原始的默认处理程序,重新建立后可处理异常。signal.signal(signal.SIGINT, signal.SIG_IGN)`KyboardInterrupt`

  3. 装饰器handle_ctrl_c可用于装饰应在输入 Ctrl-C 时立即退出的多处理函数和方法。此装饰器将测试是否ctrl_c_entered设置了全局标志,如果设置了,则甚至不会运行该函数/方法,而是返回一个KeyboardInterrupt异常实例。否则,将为 建立一个 try/catch 处理程序KeyboardInterrupt,并调用装饰的函数/方法。如果输入了 Ctrl-C,全局ctrl_c_entered将设置为True,并返回一个KeyboardInterrupt异常实例。无论如何,在返回之前,装饰器将重新建立 SIG_IGN 处理程序。

实质上,所有已提交的任务都将被允许启动,但KeyBoardInterrupt一旦输入 Ctrl-C,将立即终止并返回异常值。主进程可以测试返回值是否存在这样的返回值,以检测是否输入了 Ctrl-C。

from multiprocessing import Pool
import signal
from time import sleep
from functools import wraps

def handle_ctrl_c(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        global ctrl_c_entered
        if not ctrl_c_entered:
            signal.signal(signal.SIGINT, default_sigint_handler) # the default
            try:
                return func(*args, **kwargs)
            except KeyboardInterrupt:
                ctrl_c_entered = True
                return KeyboardInterrupt()
            finally:
                signal.signal(signal.SIGINT, pool_ctrl_c_handler)
        else:
            return KeyboardInterrupt()
    return wrapper

@handle_ctrl_c
def slowly_square(i):
    sleep(1)
    return i*i

def pool_ctrl_c_handler(*args, **kwargs):
    global ctrl_c_entered
    ctrl_c_entered = True

def init_pool():
    # set global variable for each process in the pool:
    global ctrl_c_entered
    global default_sigint_handler
    ctrl_c_entered = False
    default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)

def main():
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    pool = Pool(initializer=init_pool)
    results = pool.map(slowly_square, range(10))
    if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
        print('Ctrl-C was entered.')
    print(results)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

印刷:

Ctrl-C was entered.
[0, 1, 4, 9, 16, 25, 36, 49, KeyboardInterrupt(), KeyboardInterrupt()]

解决方案 6:

通常,这种简单的结构适用于Ctrl- Con Pool:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

正如一些类似的帖子所述:

在 Python 中捕获键盘中断,无需 try-except

解决方案 7:

似乎有两个问题导致多处理过程中的异常令人烦恼。第一个(Glenn 指出)是您需要使用map_async超时而不是map才能获得即时响应(即不完成整个列表的处理)。第二个(Andrey 指出)是多处理不会捕获未继承自Exception(例如SystemExit)的异常。所以这是我的解决方案,可以解决这两个问题:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):
%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

解决方案 8:

我是 Python 新手。我到处寻找答案,偶然发现了这个和其他一些博客和 YouTube 视频。我尝试复制粘贴上述作者的代码,并在 Windows 7 64 位上的 Python 2.7.13 上重现它。它接近我想要实现的目标。

我让子进程忽略 ControlC 并终止父进程。看来绕过子进程确实可以避免这个问题。

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

开始的部分pool.terminate()似乎永远都不会执行。

解决方案 9:

我发现,目前最好的解决方案是不使用 multiprocessing.pool 功能,而是推出自己的池功能。我提供了一个使用 apply_async 演示错误的示例,以及一个演示如何完全避免使用池功能的示例。

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

解决方案 10:

您可以尝试使用 Pool 对象的 apply_async 方法,如下所示:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

输出:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

这种方法的优点是中断之前处理的结果将返回到结果字典中:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

解决方案 11:

奇怪的是,看起来你KeyboardInterrupt还必须处理子代中的。我原本以为这会按书面形式工作……尝试更改slowly_square为:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

这应该可以如您所期望的那样工作。

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   1565  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1354  
  信创国产芯片作为信息技术创新的核心领域,对于推动国家自主可控生态建设具有至关重要的意义。在全球科技竞争日益激烈的背景下,实现信息技术的自主可控,摆脱对国外技术的依赖,已成为保障国家信息安全和产业可持续发展的关键。国产芯片作为信创产业的基石,其发展水平直接影响着整个信创生态的构建与完善。通过不断提升国产芯片的技术实力、产...
国产信创系统   21  
  信创生态建设旨在实现信息技术领域的自主创新和安全可控,涵盖了从硬件到软件的全产业链。随着数字化转型的加速,信创生态建设的重要性日益凸显,它不仅关乎国家的信息安全,更是推动产业升级和经济高质量发展的关键力量。然而,在推进信创生态建设的过程中,面临着诸多复杂且严峻的挑战,需要深入剖析并寻找切实可行的解决方案。技术创新难题技...
信创操作系统   27  
  信创产业作为国家信息技术创新发展的重要领域,对于保障国家信息安全、推动产业升级具有关键意义。而国产芯片作为信创产业的核心基石,其研发进展备受关注。在信创国产芯片的研发征程中,面临着诸多复杂且艰巨的难点,这些难点犹如一道道关卡,阻碍着国产芯片的快速发展。然而,科研人员和相关企业并未退缩,积极探索并提出了一系列切实可行的解...
国产化替代产品目录   28  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用