¿El grupo de procesos de Python no es demoníaco?
¿Sería posible crear un grupo de Python que no sea demoníaco? Quiero que un grupo pueda llamar a una función que tenga otro grupo dentro.
Quiero esto porque los procesos demoníacos no pueden crear procesos. Específicamente, causará el error:
AssertionError: daemonic processes are not allowed to have children
Por ejemplo, considere el escenario en el que function_a
hay function_b
un grupo que se ejecuta function_c
. Esta cadena de funciones fallará porque function_b
se está ejecutando en un proceso demonio y los procesos demonio no pueden crear procesos.
La multiprocessing.pool.Pool
clase crea los procesos de trabajo en su __init__
método, los vuelve demoníacos y los inicia, y no es posible restablecer su daemon
atributo False
antes de que se iniciaran (y después ya no está permitido). Pero puede crear su propia subclase de multiprocesing.pool.Pool
( multiprocessing.Pool
es solo una función contenedora) y sustituirla por su propia multiprocessing.Process
subclase, que siempre no es demoníaca, para usarla en los procesos de trabajo.
Aquí hay un ejemplo completo de cómo hacer esto. Las partes importantes son las dos clases NoDaemonProcess
y MyPool
en la parte superior y llamar pool.close()
y pool.join()
en su MyPool
instancia al final.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time
from random import randint
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t
def work(num_procs):
print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = multiprocessing.Pool(num_procs)
result = pool.map(sleepawhile,
[randint(1, 5) for x in range(num_procs)])
# The following is not really needed, since the (daemon) workers of the
# child's pool are killed when the child is terminated, but it's good
# practice to cleanup after ourselves anyway.
pool.close()
pool.join()
return result
def test():
print("Creating 5 (non-daemon) workers and jobs in main process.")
pool = MyPool(5)
result = pool.map(work, [randint(1, 5) for x in range(5)])
pool.close()
pool.join()
print(result)
if __name__ == '__main__':
test()
Tuve la necesidad de emplear un grupo no demoníaco en Python 3.7 y terminé adaptando el código publicado en la respuesta aceptada. A continuación se muestra el fragmento que crea el grupo no demoníaco:
import multiprocessing.pool
class NoDaemonProcess(multiprocessing.Process):
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, value):
pass
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(NestablePool, self).__init__(*args, **kwargs)
Como la implementación actual de multiprocessing
ha sido ampliamente refactorizada para basarse en contextos, debemos proporcionar una NoDaemonContext
clase que tenga nuestro NoDaemonProcess
atributo as. NestablePool
Luego usará ese contexto en lugar del predeterminado.
Dicho esto, debo advertir que este enfoque conlleva al menos dos advertencias:
- Todavía depende de los detalles de implementación del
multiprocessing
paquete y, por lo tanto, podría fallar en cualquier momento. - Hay razones válidas por las que
multiprocessing
resulta tan difícil utilizar procesos no demoníacos, muchas de las cuales se explican aquí . El más convincente en mi opinión es:
En cuanto a permitir que los subprocesos secundarios generen sus propios subprocesos, se corre el riesgo de crear un pequeño ejército de 'nietos' zombis si los subprocesos principales o secundarios terminan antes de que el subproceso se complete y regrese.
A partir de Python 3.8, concurrent.futures.ProcessPoolExecutor
no tiene esta limitación. Puede tener un grupo de procesos anidados sin ningún problema:
from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time
def pid():
return current_process().pid
def _square(i): # Runs in inner_pool
square = i ** 2
time.sleep(i / 10)
print(f'{pid()=} {i=} {square=}')
return square
def _sum_squares(i, j): # Runs in outer_pool
with Pool(max_workers=2) as inner_pool:
squares = inner_pool.map(_square, (i, j))
sum_squares = sum(squares)
time.sleep(sum_squares ** .5)
print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
return sum_squares
def main():
with Pool(max_workers=3) as outer_pool:
for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
print(f'{pid()=} {sum_squares=}')
if __name__ == "__main__":
main()
El código de demostración anterior se probó con Python 3.8.
Sin embargo, una limitación de ProcessPoolExecutor
, es que no tiene maxtasksperchild
. Si necesita esto, considere la respuesta de Massimiliano .
Crédito: respuesta de jfs