在 Python 中使用多处理时应该如何记录?
- 2025-01-15 08:45:00
- admin 原创
- 114
问题描述:
现在,我在一个框架中有一个中央模块,它使用 Python 2.6multiprocessing
模块生成多个进程。因为它使用multiprocessing
,所以有模块级多处理感知日志。LOG = multiprocessing.get_logger()
根据文档,此记录器(编辑)没有进程共享锁,因此您不会因为多个进程同时写入而弄乱sys.stderr
(或任何文件句柄)。
我现在遇到的问题是框架中的其他模块不支持多处理。在我看来,我需要让这个中央模块上的所有依赖项都使用支持多处理日志记录。这在框架内很烦人,更不用说对框架的所有客户端来说了。有没有我没有想到的替代方案?
解决方案 1:
我刚刚编写了一个自己的日志处理程序,它通过管道将所有内容提供给父进程。我只测试了十分钟,但似乎效果很好。
(注意:这是硬编码的RotatingFileHandler
,这是我自己的用例。)
更新:@javier 现在将此方法作为 Pypi 上可用的包进行维护 - 请参阅Pypi 上的multiprocessing-logging,github 上的https://github.com/jruere/multiprocessing-logging
更新:实施!
现在,它使用队列来正确处理并发,并能正确从错误中恢复。我已经在生产中使用它几个月了,下面的当前版本运行正常。
from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback
class MultiProcessingLog(logging.Handler):
def __init__(self, name, mode, maxsize, rotate):
logging.Handler.__init__(self)
self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
self.queue = multiprocessing.Queue(-1)
t = threading.Thread(target=self.receive)
t.daemon = True
t.start()
def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)
def receive(self):
while True:
try:
record = self.queue.get()
self._handler.emit(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
def send(self, s):
self.queue.put_nowait(s)
def _format_record(self, record):
# ensure that exc_info and args
# have been stringified. Removes any chance of
# unpickleable things inside and possibly reduces
# message size sent over the pipe
if record.args:
record.msg = record.msg % record.args
record.args = None
if record.exc_info:
dummy = self.format(record)
record.exc_info = None
return record
def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
self._handler.close()
logging.Handler.close(self)
解决方案 2:
解决此问题的唯一非侵入式方法是:
生成每个工作进程,以便其日志进入不同的文件描述符(磁盘或管道)。理想情况下,所有日志条目都应带有时间戳。
然后你的控制器进程可以执行以下操作之一:
* **如果使用磁盘文件:**在运行结束时合并日志文件,按时间戳排序
* **如果使用管道(推荐):**将所有管道中的日志条目即时合并到中央日志文件中。(例如,定期`select`从管道的文件描述符中对可用的日志条目执行合并排序,然后刷新到中央日志。重复。)
解决方案 3:
QueueHandler
Python 3.2+ 中原生支持此功能,并且能够实现此功能。在之前的版本中也可以轻松复制此功能。
Python 文档有两个完整的示例:从多个进程记录到单个文件
每个进程(包括父进程)都将其日志记录放在 上Queue
,然后一个listener
线程或进程(每个进程都提供一个示例)拾取这些日志并将它们全部写入文件 - 不存在损坏或乱码的风险。
对于使用 Python <3.2 的用户,请导入logutils(与 Python 3.2 本机代码相同)。
PS. 如果你的 CPU 受限
另外,除了日志记录过程之外,其他地方都不需要 StreamHandler(日志记录默认添加),并且在分析过程中我发现,与仅有 QueueHandler 相比,它增加了大量的 CPU 使用率,这是由于所有额外的格式、记录创建等。你可以将其从非日志记录过程中删除:
for handler in logger.handlers:
if isinstance(handler, logging.StreamHandler):
logger.removeHandler(handler)
break
或者,如果您尚未添加任何处理程序,则在添加之前QueueHandler
可以StreamHandler
使用以下命令删除:logger.removeHandler(logger.handlers[0])
解决方案 4:
以下是另一种解决方案,重点是简化从 Google 访问的任何人(如我)的操作。日志记录应该很容易!仅适用于 3.2 或更高版本。
import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random
def f(i):
time.sleep(random.uniform(.01, .05))
logging.info('function called with {} in worker thread.'.format(i))
time.sleep(random.uniform(.01, .05))
return i
def worker_init(q):
# all records from worker processes go to qh and then into q
qh = QueueHandler(q)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(qh)
def logger_init():
q = multiprocessing.Queue()
# this is the handler for all log records
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))
# ql gets records from the queue and sends them to the handler
ql = QueueListener(q, handler)
ql.start()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# add the handler to the logger so records from this process are handled
logger.addHandler(handler)
return ql, q
def main():
q_listener, q = logger_init()
logging.info('hello from main thread')
pool = multiprocessing.Pool(4, worker_init, [q])
for result in pool.map(f, range(10)):
pass
pool.close()
pool.join()
q_listener.stop()
if __name__ == '__main__':
main()
解决方案 5:
截至 2020 年,似乎有一种更简单的使用多处理进行日志记录的方法。
此函数将创建记录器。您可以在此处设置格式以及输出到的位置(文件、标准输出):
def create_logger():
import multiprocessing, logging
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(\n '[%(asctime)s| %(levelname)s| %(processName)s] %(message)s')
handler = logging.FileHandler('logs/your_file_name.log')
handler.setFormatter(formatter)
# this bit will make sure you won't have
# duplicated messages in the output
if not len(logger.handlers):
logger.addHandler(handler)
return logger
在初始化中实例化记录器:
if __name__ == '__main__':
from multiprocessing import Pool
logger = create_logger()
logger.info('Starting pooling')
p = Pool()
# rest of the code
现在,您只需在每个需要记录的函数中添加此引用:
logger = create_logger()
并输出消息:
logger.info(f'My message from {something}')
希望这有帮助。
解决方案 6:
logging
另一种选择可能是包中的各种非基于文件的日志处理程序:
SocketHandler
DatagramHandler
SyslogHandler
(及其他)
这样,您可以轻松地在某个地方拥有一个日志守护进程,您可以安全地对其进行写入,并正确处理结果。(例如,一个简单的套接字服务器,它只是解开消息并将其发送到自己的旋转文件处理程序。)
也会SyslogHandler
为您处理这个问题。当然,您可以使用自己的 实例syslog
,而不是系统实例。
解决方案 7:
其他版本的变体将日志记录和队列线程保持分开。
"""sample code for logging in subprocesses using multiprocessing
* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
process.
* As in the other implementations, a thread reads the queue and calls the
handlers. Except in this implementation, the thread is defined outside of a
handler, which makes the logger definitions simpler.
* Works with multiple handlers. If the logger in the main process defines
multiple handlers, they will all be fed records generated by the
subprocesses loggers.
tested with Python 2.5 and 2.6 on Linux and Windows
"""
import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys
DEFAULT_LEVEL = logging.DEBUG
formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")
class SubProcessLogHandler(logging.Handler):
"""handler used by subprocesses
It simply puts items on a Queue for the main process to log.
"""
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
self.queue.put(record)
class LogQueueReader(threading.Thread):
"""thread to write subprocesses log records to main process log
This thread reads the records written by subprocesses and writes them to
the handlers defined in the main process's handlers.
"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
self.daemon = True
def run(self):
"""read from the queue and write to the log handlers
The logging documentation says logging is thread safe, so there
shouldn't be contention between normal logging (from the main
process) and this thread.
Note that we're using the name of the original logger.
"""
# Thanks Mike for the error checking code.
while True:
try:
record = self.queue.get()
# get the logger for this record
logger = logging.getLogger(record.name)
logger.callHandlers(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
class LoggingProcess(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def _setupLogger(self):
# create the logger to use.
logger = logging.getLogger('test.subprocess')
# The only handler desired is the SubProcessLogHandler. If any others
# exist, remove them. In this case, on Unix and Linux the StreamHandler
# will be inherited.
for handler in logger.handlers:
# just a check for my sanity
assert not isinstance(handler, SubProcessLogHandler)
logger.removeHandler(handler)
# add the handler
handler = SubProcessLogHandler(self.queue)
handler.setFormatter(formatter)
logger.addHandler(handler)
# On Windows, the level will not be inherited. Also, we could just
# set the level to log everything here and filter it in the main
# process handlers. For now, just set it from the global default.
logger.setLevel(DEFAULT_LEVEL)
self.logger = logger
def run(self):
self._setupLogger()
logger = self.logger
# and here goes the logging
p = multiprocessing.current_process()
logger.info('hello from process %s with pid %s' % (p.name, p.pid))
if __name__ == '__main__':
# queue used by the subprocess loggers
queue = multiprocessing.Queue()
# Just a normal logger
logger = logging.getLogger('test')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(DEFAULT_LEVEL)
logger.info('hello from the main process')
# This thread will read from the subprocesses and write to the main log's
# handlers.
log_queue_reader = LogQueueReader(queue)
log_queue_reader.start()
# create the processes.
for i in range(10):
p = LoggingProcess(queue)
p.start()
# The way I read the multiprocessing warning about Queue, joining a
# process before it has finished feeding the Queue can cause a deadlock.
# Also, Queue.empty() is not realiable, so just make sure all processes
# are finished.
# active_children joins subprocesses when they're finished.
while multiprocessing.active_children():
time.sleep(.1)
解决方案 8:
目前所有的解决方案都通过使用处理程序与日志配置耦合过度。我的解决方案具有以下架构和功能:
您可以使用任何您想要的日志配置
日志记录在守护线程中完成
使用上下文管理器安全关闭守护进程
与日志线程的通信通过以下方式完成
multiprocessing.Queue
在子流程中(以及已经定义的实例),所有
logging.Logger
记录都被发送到队列新功能:在发送到队列之前格式化回溯和消息以防止出现 pickle 错误
带有使用示例和输出的代码可以在以下 Gist 中找到:https ://gist.github.com/schlamar/7003737
解决方案 9:
由于我们可以将多进程日志记录表示为多个发布者和一个订阅者(监听器),因此使用ZeroMQ实现 PUB-SUB 消息传递确实是一种选择。
此外,PyZMQ模块(ZMQ 的 Python 绑定)实现了PUBHandler,它是通过 zmq.PUB 套接字发布日志消息的对象。
网络上有一个解决方案,使用 PyZMQ 和 PUBHandler 从分布式应用程序进行集中日志记录,可以轻松采用该解决方案在本地处理多个发布流程。
formatters = {
logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}
# This one will be used by publishing processes
class PUBLogger:
def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
self._logger = logging.getLogger(__name__)
self._logger.setLevel(logging.DEBUG)
self.ctx = zmq.Context()
self.pub = self.ctx.socket(zmq.PUB)
self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
self._handler = PUBHandler(self.pub)
self._handler.formatters = formatters
self._logger.addHandler(self._handler)
@property
def logger(self):
return self._logger
# This one will be used by listener process
class SUBLogger:
def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
self.output_dir = output_dir
self._logger = logging.getLogger()
self._logger.setLevel(logging.DEBUG)
self.ctx = zmq.Context()
self._sub = self.ctx.socket(zmq.SUB)
self._sub.bind('tcp://*:{1}'.format(ip, port))
self._sub.setsockopt(zmq.SUBSCRIBE, "")
handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
handler.setFormatter(formatter)
self._logger.addHandler(handler)
@property
def sub(self):
return self._sub
@property
def logger(self):
return self._logger
# And that's the way we actually run things:
# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
sub_logger = SUBLogger(ip)
while not event.is_set():
try:
topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
log_msg = getattr(logging, topic.lower())
log_msg(message)
except zmq.ZMQError as zmq_error:
if zmq_error.errno == zmq.EAGAIN:
pass
# Publisher processes loggers should be initialized as follows:
class Publisher:
def __init__(self, stop_event, proc_id):
self.stop_event = stop_event
self.proc_id = proc_id
self._logger = pub_logger.PUBLogger('127.0.0.1').logger
def run(self):
self._logger.info("{0} - Sending message".format(proc_id))
def run_worker(event, proc_id):
worker = Publisher(event, proc_id)
worker.run()
# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
args=('127.0.0.1'), stop_event,))
sub_logger_process.start()
#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
processes.append(Process(target=run_worker,
args=(stop_event, i,)))
for p in processes:
p.start()
解决方案 10:
我也喜欢 zzzeek 的回答,但 Andre 说得对,需要一个队列来防止乱码。我在管道方面运气不错,但确实看到了乱码,这在某种程度上是意料之中的。实现它比我想象的要难,特别是因为在 Windows 上运行,那里有一些关于全局变量和其他内容的额外限制(请参阅:如何在 Windows 上实现 Python 多处理?)
但是,我终于让它工作了。这个例子可能并不完美,所以欢迎提出意见和建议。它也不支持设置格式化程序或除根记录器之外的任何东西。基本上,您必须在每个带有队列的池进程中重新启动记录器,并在记录器上设置其他属性。
再次,欢迎提出任何关于如何改进代码的建议。我当然还不知道所有的 Python 技巧 :-)
import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue
class MultiProcessingLogHandler(logging.Handler):
def __init__(self, handler, queue, child=False):
logging.Handler.__init__(self)
self._handler = handler
self.queue = queue
# we only want one of the loggers to be pulling from the queue.
# If there is a way to do this without needing to be passed this
# information, that would be great!
if child == False:
self.shutdown = False
self.polltime = 1
t = threading.Thread(target=self.receive)
t.daemon = True
t.start()
def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)
def receive(self):
#print "receive on"
while (self.shutdown == False) or (self.queue.empty() == False):
# so we block for a short period of time so that we can
# check for the shutdown cases.
try:
record = self.queue.get(True, self.polltime)
self._handler.emit(record)
except Queue.Empty, e:
pass
def send(self, s):
# send just puts it in the queue for the server to retrieve
self.queue.put(s)
def _format_record(self, record):
ei = record.exc_info
if ei:
dummy = self.format(record) # just to get traceback text into record.exc_text
record.exc_info = None # to avoid Unpickleable error
return record
def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
time.sleep(self.polltime+1) # give some time for messages to enter the queue.
self.shutdown = True
time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown
def __del__(self):
self.close() # hopefully this aids in orderly shutdown when things are going poorly.
def f(x):
# just a logging command...
logging.critical('function number: ' + str(x))
# to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
time.sleep(x % 3)
def initPool(queue, level):
"""
This causes the logging module to be initialized with the necessary info
in pool threads to work correctly.
"""
logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
logging.getLogger('').setLevel(level)
if __name__ == '__main__':
stream = StringIO.StringIO()
logQueue = multiprocessing.Queue(100)
handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
logging.getLogger('').addHandler(handler)
logging.getLogger('').setLevel(logging.DEBUG)
logging.debug('starting main')
# when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
pool.map(f, range(0,50))
pool.close()
logging.debug('done')
logging.shutdown()
print "stream output is:"
print stream.getvalue()
解决方案 11:
我建议使用 logger_tt 库: https://github.com/Dragon2fly/logger_tt
multiporcessing_logging 库无法在我的 macOSX 上运行,但 logger_tt 可以。
解决方案 12:
并发日志处理程序似乎完美地完成了这项工作。已在 Windows 上测试。还支持 POSIX 系统。
大意
创建一个单独的文件,其中包含返回记录器的函数。记录器必须
ConcurrentRotatingFileHandler
为每个进程提供新的实例。示例函数get_logger()
如下。创建记录器是在流程初始化时完成的。对于
multiprocessing.Process
子类来说,这意味着run()
方法的开始。
详细说明
在此示例中,我将使用以下文件结构
.
│-- child.py <-- For a child process
│-- logs.py <-- For setting up the logs for the app
│-- main.py <-- For a main process
│-- myapp.py <-- For starting the app
│-- somemodule.py <-- For an example, a "3rd party module using standard logging"
代码
子进程
# child.py
import multiprocessing as mp
import time
from somemodule import do_something
class ChildProcess(mp.Process):
def __init__(self):
self.logger = None
super().__init__()
def run(self):
from logs import get_logger
self.logger = get_logger()
while True:
time.sleep(1)
self.logger.info("Child process")
do_something()
简单的子进程继承
multiprocessing.Process
并简单记录到文件文本“子进程”重要提示:在 内部或子进程内部的其他地方(不是模块级别或)
get_logger()
调用。这在创建实例时是必需的,并且每个进程都需要新的实例。run()
`__init__()get_logger()
ConcurrentRotatingFileHandler`仅用于证明它
do_something
可以与第三方库代码一起使用,而第三方库代码并不知道您正在使用并发日志处理程序。
主要流程
# main.py
import logging
import multiprocessing as mp
import time
from child import ChildProcess
from somemodule import do_something
class MainProcess(mp.Process):
def __init__(self):
self.logger = logging.getLogger()
super().__init__()
def run(self):
from logs import get_logger
self.logger = get_logger()
self.child = ChildProcess()
self.child.daemon = True
self.child.start()
while True:
time.sleep(0.5)
self.logger.critical("Main process")
do_something()
每秒登录两次文件的主进程“主进程”。同样继承自
multiprocessing.Process
。get_logger()
和 的注释do_something()
与子进程相同。
记录器设置
# logs.py
import logging
import os
from concurrent_log_handler import ConcurrentRotatingFileHandler
LOGLEVEL = logging.DEBUG
def get_logger():
logger = logging.getLogger()
if logger.handlers:
return logger
# Use an absolute path to prevent file rotation trouble.
logfile = os.path.abspath("mylog.log")
logger.setLevel(LOGLEVEL)
# Rotate log after reaching 512K, keep 5 old copies.
filehandler = ConcurrentRotatingFileHandler(
logfile, mode="a", maxBytes=512 * 1024, backupCount=5, encoding="utf-8"
)
filehandler.setLevel(LOGLEVEL)
# create also handler for displaying output in the stdout
ch = logging.StreamHandler()
ch.setLevel(LOGLEVEL)
formatter = logging.Formatter(
"%(asctime)s - %(module)s - %(levelname)s - %(message)s [Process: %(process)d, %(filename)s:%(funcName)s(%(lineno)d)]"
)
# add formatter to ch
ch.setFormatter(formatter)
filehandler.setFormatter(formatter)
logger.addHandler(ch)
logger.addHandler(filehandler)
return logger
这使用了
ConcurrentRotatingFileHandler
来自并发日志处理程序包的。每个进程都需要一个新的 ConcurrentRotatingFileHandler 实例。请注意,每个过程中的所有参数都
ConcurrentRotatingFileHandler
应该相同。
示例应用程序
# myapp.py
if __name__ == "__main__":
from main import MainProcess
p = MainProcess()
p.start()
这只是关于如何启动多进程应用程序的一个简单示例
使用标准的第三方模块示例logging
# somemodule.py
import logging
logger = logging.getLogger("somemodule")
def do_something():
logging.info("doing something")
这只是一个简单的示例,用于测试第三方代码的记录器是否正常工作。
示例输出
2021-04-19 19:02:29,425 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,427 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:29,929 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,931 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,133 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:30,137 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:30,436 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,439 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,944 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,946 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:31,142 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:31,145 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:31,449 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:31,451 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
解决方案 13:
只需在某处发布您的记录器实例。这样,其他模块和客户端就可以使用您的 API 来获取记录器,而不必import multiprocessing
。
解决方案 14:
如何将所有日志记录委托给从队列中读取所有日志条目的另一个进程?
LOG_QUEUE = multiprocessing.JoinableQueue()
class CentralLogger(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
self.log = logger.getLogger('some_config')
self.log.info("Started Central Logging process")
def run(self):
while True:
log_level, message = self.queue.get()
if log_level is None:
self.log.info("Shutting down Central Logging process")
break
else:
self.log.log(log_level, message)
central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()
只需通过任何多进程机制甚至继承共享 LOG_QUEUE,一切就可以顺利进行!
解决方案 15:
下面是一个可以在 Windows 环境中使用的类,需要 ActivePython。您还可以继承其他日志处理程序(StreamHandler 等)。
class SyncronizedFileHandler(logging.FileHandler):
MUTEX_NAME = 'logging_mutex'
def __init__(self , *args , **kwargs):
self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)
def emit(self, *args , **kwargs):
try:
win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
finally:
win32event.ReleaseMutex(self.mutex)
return ret
下面是一个演示用法的示例:
import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool
def f(i):
time.sleep(random.randint(0,10) * 0.1)
ch = random.choice(letters)
logging.info( ch * 30)
def init_logging():
'''
initilize the loggers
'''
formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
logger = logging.getLogger()
logger.setLevel(logging.INFO)
file_handler = SyncronizedFileHandler(sys.argv[1])
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
#must be called in the parent and in every worker process
init_logging()
if __name__ == '__main__':
#multiprocessing stuff
pool = Pool(processes=10)
imap_result = pool.imap(f , range(30))
for i , _ in enumerate(imap_result):
pass
解决方案 16:
我有一个类似于 ironhacker 的解决方案,只是我在某些代码中使用了 logstash.exception,并且发现我需要在将异常传回队列之前对其进行格式化,因为回溯无法进行 pickle:
class QueueHandler(logging.Handler):
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
if record.exc_info:
# can't pass exc_info across processes so just format now
record.exc_text = self.formatException(record.exc_info)
record.exc_info = None
self.queue.put(record)
def formatException(self, ei):
sio = cStringIO.StringIO()
traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
s = sio.getvalue()
sio.close()
if s[-1] == "
":
s = s[:-1]
return s
解决方案 17:
如果模块中的锁、线程和分叉组合中出现死锁logging
,则会在错误报告 6721中报告(另请参阅相关的 SO 问题)。
这里发布了一个小的修复解决方案。
但是,这只会修复 中的任何潜在死锁logging
。这不会解决可能出现混乱的问题。请参阅此处提供的其他答案。
解决方案 18:
这是我的简单破解/解决方法......不是最全面的,但易于修改且比我写这篇文章之前找到的任何其他答案都更容易阅读和理解:
import logging
import multiprocessing
class FakeLogger(object):
def __init__(self, q):
self.q = q
def info(self, item):
self.q.put('INFO - {}'.format(item))
def debug(self, item):
self.q.put('DEBUG - {}'.format(item))
def critical(self, item):
self.q.put('CRITICAL - {}'.format(item))
def warning(self, item):
self.q.put('WARNING - {}'.format(item))
def some_other_func_that_gets_logger_and_logs(num):
# notice the name get's discarded
# of course you can easily add this to your FakeLogger class
local_logger = logging.getLogger('local')
local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
local_logger.debug('hmm, something may need debugging here')
return num*2
def func_to_parallelize(data_chunk):
# unpack our args
the_num, logger_q = data_chunk
# since we're now in a new process, let's monkeypatch the logging module
logging.getLogger = lambda name=None: FakeLogger(logger_q)
# now do the actual work that happens to log stuff too
new_num = some_other_func_that_gets_logger_and_logs(the_num)
return (the_num, new_num)
if __name__ == '__main__':
multiprocessing.freeze_support()
m = multiprocessing.Manager()
logger_q = m.Queue()
# we have to pass our data to be parallel-processed
# we also need to pass the Queue object so we can retrieve the logs
parallelable_data = [(1, logger_q), (2, logger_q)]
# set up a pool of processes so we can take advantage of multiple CPU cores
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
worker_output = pool.map(func_to_parallelize, parallelable_data)
pool.close() # no more tasks
pool.join() # wrap up current tasks
# get the contents of our FakeLogger object
while not logger_q.empty():
print logger_q.get()
print 'worker output contained: {}'.format(worker_output)
解决方案 19:
有这个很棒的套餐
包:
https://pypi.python.org/pypi/multiprocessing-logging/
代码:
https ://github.com/jruere/multiprocessing-logging
安装:
pip install multiprocessing-logging
然后添加:
import multiprocessing_logging
# This enables logs inside process
multiprocessing_logging.install_mp_handler()
解决方案 20:
对于可能需要此功能的人,我为 multiprocessing_logging 包编写了一个装饰器,将当前进程名称添加到日志中,因此可以清楚地知道谁记录了什么。
它还运行 install_mp_handler(),因此在创建池之前运行它变得毫无用处。
这使我能够看到哪个工作人员创建了哪些日志消息。
以下是带有示例的蓝图:
import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging
# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)
# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
class MultiProcessLogFilter(logging.Filter):
def filter(self, record):
try:
process_name = multiprocessing.current_process().name
except BaseException:
process_name = __name__
record.msg = f'{process_name} :: {record.msg}'
return True
multiprocessing_logging.install_mp_handler()
f = MultiProcessLogFilter()
# Wraps is needed here so apply / apply_async know the function name
@wraps(fn)
def wrapper(*args, **kwargs):
logger.removeFilter(f)
logger.addFilter(f)
return fn(*args, **kwargs)
return wrapper
# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
logger.info(f'test function called via: {argument}')
# You can also redefine undecored functions
def undecorated_function():
logger.info('I am not decorated')
@logs_mp_process_names
def redecorated(*args, **kwargs):
return undecorated_function(*args, **kwargs)
# Enjoy
if __name__ == '__main__':
with multiprocessing.Pool() as mp_pool:
# Also works with apply_async
mp_pool.apply(test, ('mp pool',))
mp_pool.apply(redecorated)
logger.info('some main logs')
test('main program')
解决方案 21:
替代方法之一是将多处理日志写入已知文件并注册一个atexit
处理程序以加入这些进程并在 stderr 上读取它;但是,通过这种方式,您将无法实时获得 stderr 上输出消息的流。
解决方案 22:
如上所述的最简单的想法:
获取当前进程的文件名和进程 ID。
设置一个。此处
[WatchedFileHandler][1]
详细讨论了使用此处理程序的原因,但简而言之,与其他日志记录处理程序相比,它存在某些更糟糕的竞争条件。此处理程序的竞争条件窗口最短。
+ 选择保存日志的路径,例如 /var/log/...