¿Cómo debo iniciar sesión mientras uso multiprocesamiento en Python?
En este momento tengo un módulo central en un marco que genera múltiples procesos usando el multiprocessing
módulo Python 2.6 . Debido a que utiliza multiprocessing
, existe un registro compatible con multiprocesamiento a nivel de módulo LOG = multiprocessing.get_logger()
. Según los documentos , este registrador ( EDITAR ) no tiene bloqueos de procesos compartidos para que no confunda las cosas sys.stderr
(o cualquier identificador de archivo) al tener varios procesos escribiendo en él simultáneamente.
El problema que tengo ahora es que los otros módulos del marco no son compatibles con el multiprocesamiento. A mi modo de ver, necesito que todas las dependencias de este módulo central utilicen un registro compatible con multiprocesamiento. Eso es molesto dentro del marco, y mucho menos para todos los clientes del marco. ¿Hay alternativas en las que no estoy pensando?
Acabo de escribir un controlador de registro propio que simplemente envía todo al proceso principal a través de una tubería. Sólo lo he estado probando durante diez minutos pero parece que funciona bastante bien.
( Nota: esto está codificado en RotatingFileHandler
, que es mi propio caso de uso).
Actualización: @javier ahora mantiene este enfoque como un paquete disponible en Pypi; consulte registro de multiprocesamiento en Pypi, github en https://github.com/jruere/multiprocessing-logging
Actualización: ¡Implementación!
Esto ahora utiliza una cola para el manejo correcto de la concurrencia y también se recupera de errores correctamente. He estado usando esto en producción durante varios meses y la versión actual a continuación funciona sin problemas.
from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback
class MultiProcessingLog(logging.Handler):
def __init__(self, name, mode, maxsize, rotate):
logging.Handler.__init__(self)
self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
self.queue = multiprocessing.Queue(-1)
t = threading.Thread(target=self.receive)
t.daemon = True
t.start()
def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)
def receive(self):
while True:
try:
record = self.queue.get()
self._handler.emit(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
def send(self, s):
self.queue.put_nowait(s)
def _format_record(self, record):
# ensure that exc_info and args
# have been stringified. Removes any chance of
# unpickleable things inside and possibly reduces
# message size sent over the pipe
if record.args:
record.msg = record.msg % record.args
record.args = None
if record.exc_info:
dummy = self.format(record)
record.exc_info = None
return record
def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
self._handler.close()
logging.Handler.close(self)
La única forma de solucionar este problema de forma no intrusiva es:
- Genere cada proceso de trabajo de modo que su registro vaya a un descriptor de archivo diferente (al disco o a la tubería). Lo ideal es que todas las entradas del registro tengan una marca de tiempo.
- Su proceso de controlador puede entonces hacer una de las siguientes cosas:
- Si utiliza archivos de disco: combine los archivos de registro al final de la ejecución, ordenados por marca de tiempo
- Si utiliza tuberías (recomendado): combine las entradas de registro sobre la marcha de todas las tuberías en un archivo de registro central. (Por ejemplo, periódicamente
select
desde los descriptores de archivos de las canalizaciones, realice una clasificación por fusión en las entradas de registro disponibles y vacíelas al registro centralizado. Repita).