使用 Python 的多处理池进行键盘中断
- 2025-01-20 09:07:00
- admin 原创
- 72
问题描述:
如何使用 python 的多处理池处理 KeyboardInterrupt 事件?这是一个简单的例子:
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
sleep(1)
return i*i
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
# **** THIS PART NEVER EXECUTES. ****
pool.terminate()
print "You cancelled the program!"
sys.exit(1)
print "
Finally, here are the results: ", results
if __name__ == "__main__":
go()
运行上述代码时,KeyboardInterrupt
我按下 时会引发^C
,但该过程只是挂在这一点,我必须从外部将其终止。
我希望能够^C
随时按下并让所有进程正常退出。
解决方案 1:
这是一个 Python 错误。在 threading.Condition.wait() 中等待条件时,键盘中断永远不会被发送。重现:
import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"
KeyboardInterrupt 异常直到 wait() 返回时才会被传递,并且它永远不会返回,因此中断永远不会发生。KeyboardInterrupt 几乎肯定会中断条件等待。
请注意,如果指定了超时,则不会发生这种情况;cond.wait(1) 将立即收到中断。因此,一种解决方法是指定超时。为此,请替换
results = pool.map(slowly_square, range(40))
和
results = pool.map_async(slowly_square, range(40)).get(9999999)
或类似。
解决方案 2:
从我最近发现的情况来看,最好的解决方案是将工作进程设置为完全忽略 SIGINT,并将所有清理代码限制在父进程中。这解决了空闲和繁忙工作进程的问题,并且不需要在子进程中编写错误处理代码。
import signal
...
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
...
def main()
pool = multiprocessing.Pool(size, init_worker)
...
except KeyboardInterrupt:
pool.terminate()
pool.join()
解释和完整的示例代码可分别在http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/和http://github.com/jreese/multiprocessing-keyboardinterrupt找到。
解决方案 3:
由于某些原因,只有从基Exception
类继承的异常才会被正常处理。作为一种解决方法,你可以重新引发你的KeyboardInterrupt
实例Exception
:
from multiprocessing import Pool
import time
class KeyboardInterruptError(Exception): pass
def f(x):
try:
time.sleep(x)
return x
except KeyboardInterrupt:
raise KeyboardInterruptError()
def main():
p = Pool(processes=4)
try:
print 'starting the pool map'
print p.map(f, range(10))
p.close()
print 'pool map complete'
except KeyboardInterrupt:
print 'got ^C while pool mapping, terminating the pool'
p.terminate()
print 'pool is terminated'
except Exception, e:
print 'got exception: %r, terminating the pool' % (e,)
p.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
p.join()
print 'join complete'
print 'the end'
if __name__ == '__main__':
main()
正常情况下你会得到以下输出:
staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end
因此如果你点击^C
,你将获得:
staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
解决方案 4:
投票的答案并未解决核心问题,却产生了类似的副作用。
多处理库的作者 Jesse Nollermultiprocessing.Pool
在一篇旧博客文章中解释了如何在使用时正确处理 CTRL+C 。
import signal
from multiprocessing import Pool
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=initializer)
try:
pool.map(perform_download, dowloads)
except KeyboardInterrupt:
pool.terminate()
pool.join()
解决方案 5:
其中许多答案都很旧,并且/或者如果您正在执行诸如 之类的方法Pool.map
,它们似乎不适用于Windows 上更高版本的 Python(我正在运行 3.8.5) ,该方法会阻塞直到所有提交的任务都完成。以下是我的解决方案。
signal.signal(signal.SIGINT, signal.SIG_IGN)
在主进程中发出调用以完全忽略 Ctrl-C。处理池将使用池初始化程序进行初始化,该程序将按如下方式初始化每个处理器:全局变量
ctrl_c_entered
将设置为,并将发出False
调用以最初忽略 Ctrl-C。此调用的返回值将被保存;这是原始的默认处理程序,重新建立后可处理异常。signal.signal(signal.SIGINT, signal.SIG_IGN)
`KyboardInterrupt`装饰器
handle_ctrl_c
可用于装饰应在输入 Ctrl-C 时立即退出的多处理函数和方法。此装饰器将测试是否ctrl_c_entered
设置了全局标志,如果设置了,则甚至不会运行该函数/方法,而是返回一个KeyboardInterrupt
异常实例。否则,将为 建立一个 try/catch 处理程序KeyboardInterrupt
,并调用装饰的函数/方法。如果输入了 Ctrl-C,全局ctrl_c_entered
将设置为True
,并返回一个KeyboardInterrupt
异常实例。无论如何,在返回之前,装饰器将重新建立 SIG_IGN 处理程序。
实质上,所有已提交的任务都将被允许启动,但KeyBoardInterrupt
一旦输入 Ctrl-C,将立即终止并返回异常值。主进程可以测试返回值是否存在这样的返回值,以检测是否输入了 Ctrl-C。
from multiprocessing import Pool
import signal
from time import sleep
from functools import wraps
def handle_ctrl_c(func):
@wraps(func)
def wrapper(*args, **kwargs):
global ctrl_c_entered
if not ctrl_c_entered:
signal.signal(signal.SIGINT, default_sigint_handler) # the default
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
ctrl_c_entered = True
return KeyboardInterrupt()
finally:
signal.signal(signal.SIGINT, pool_ctrl_c_handler)
else:
return KeyboardInterrupt()
return wrapper
@handle_ctrl_c
def slowly_square(i):
sleep(1)
return i*i
def pool_ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True
def init_pool():
# set global variable for each process in the pool:
global ctrl_c_entered
global default_sigint_handler
ctrl_c_entered = False
default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)
def main():
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=init_pool)
results = pool.map(slowly_square, range(10))
if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
print('Ctrl-C was entered.')
print(results)
pool.close()
pool.join()
if __name__ == '__main__':
main()
印刷:
Ctrl-C was entered.
[0, 1, 4, 9, 16, 25, 36, 49, KeyboardInterrupt(), KeyboardInterrupt()]
解决方案 6:
通常,这种简单的结构适用于Ctrl
- C
on Pool:
def signal_handle(_signal, frame):
print "Stopping the Jobs."
signal.signal(signal.SIGINT, signal_handle)
正如一些类似的帖子所述:
在 Python 中捕获键盘中断,无需 try-except
解决方案 7:
似乎有两个问题导致多处理过程中的异常令人烦恼。第一个(Glenn 指出)是您需要使用map_async
超时而不是map
才能获得即时响应(即不完成整个列表的处理)。第二个(Andrey 指出)是多处理不会捕获未继承自Exception
(例如SystemExit
)的异常。所以这是我的解决方案,可以解决这两个问题:
import sys
import functools
import traceback
import multiprocessing
def _poolFunctionWrapper(function, arg):
"""Run function under the pool
Wrapper around function to catch exceptions that don't inherit from
Exception (which aren't caught by multiprocessing, so that you end
up hitting the timeout).
"""
try:
return function(arg)
except:
cls, exc, tb = sys.exc_info()
if issubclass(cls, Exception):
raise # No worries
# Need to wrap the exception with something multiprocessing will recognise
import traceback
print "Unhandled exception %s (%s):
%s" % (cls.__name__, exc, traceback.format_exc())
raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))
def _runPool(pool, timeout, function, iterable):
"""Run the pool
Wrapper around pool.map_async, to handle timeout. This is required so as to
trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
Further wraps the function in _poolFunctionWrapper to catch exceptions
that don't inherit from Exception.
"""
return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
def myMap(function, iterable, numProcesses=1, timeout=9999):
"""Run the function on the iterable, optionally with multiprocessing"""
if numProcesses > 1:
pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
mapFunc = functools.partial(_runPool, pool, timeout)
else:
pool = None
mapFunc = map
results = mapFunc(function, iterable)
if pool is not None:
pool.close()
pool.join()
return results
解决方案 8:
我是 Python 新手。我到处寻找答案,偶然发现了这个和其他一些博客和 YouTube 视频。我尝试复制粘贴上述作者的代码,并在 Windows 7 64 位上的 Python 2.7.13 上重现它。它接近我想要实现的目标。
我让子进程忽略 ControlC 并终止父进程。看来绕过子进程确实可以避免这个问题。
#!/usr/bin/python
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
try:
print "<slowly_square> Sleeping and later running a square calculation..."
sleep(1)
return i * i
except KeyboardInterrupt:
print "<child processor> Don't care if you say CtrlC"
pass
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
pool.terminate()
pool.close()
print "You cancelled the program!"
exit(1)
print "Finally, here are the results", results
if __name__ == '__main__':
go()
开始的部分pool.terminate()
似乎永远都不会执行。
解决方案 9:
我发现,目前最好的解决方案是不使用 multiprocessing.pool 功能,而是推出自己的池功能。我提供了一个使用 apply_async 演示错误的示例,以及一个演示如何完全避免使用池功能的示例。
http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/
解决方案 10:
您可以尝试使用 Pool 对象的 apply_async 方法,如下所示:
import multiprocessing
import time
from datetime import datetime
def test_func(x):
time.sleep(2)
return x**2
def apply_multiprocessing(input_list, input_function):
pool_size = 5
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)
try:
jobs = {}
for value in input_list:
jobs[value] = pool.apply_async(input_function, [value])
results = {}
for value, result in jobs.items():
try:
results[value] = result.get()
except KeyboardInterrupt:
print "Interrupted by user"
pool.terminate()
break
except Exception as e:
results[value] = e
return results
except Exception:
raise
finally:
pool.close()
pool.join()
if __name__ == "__main__":
iterations = range(100)
t0 = datetime.now()
results1 = apply_multiprocessing(iterations, test_func)
t1 = datetime.now()
print results1
print "Multi: {}".format(t1 - t0)
t2 = datetime.now()
results2 = {i: test_func(i) for i in iterations}
t3 = datetime.now()
print results2
print "Non-multi: {}".format(t3 - t2)
输出:
100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000
这种方法的优点是中断之前处理的结果将返回到结果字典中:
>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
解决方案 11:
奇怪的是,看起来你KeyboardInterrupt
还必须处理子代中的。我原本以为这会按书面形式工作……尝试更改slowly_square
为:
def slowly_square(i):
try:
sleep(1)
return i * i
except KeyboardInterrupt:
print 'You EVIL bastard!'
return 0
这应该可以如您所期望的那样工作。