¿Grupo de subprocesos similar al grupo de multiprocesamiento?
¿Existe una clase Pool para subprocesos de trabajo , similar a la clase Pool del módulo multiprocesamiento ?
Me gusta, por ejemplo, la manera fácil de paralelizar una función de mapa.
def long_running_func(p):
c_func_no_gil(p)
p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))
sin embargo, me gustaría hacerlo sin la sobrecarga de crear nuevos procesos.
Sé sobre el GIL. Sin embargo, en mi caso de uso, la función será una función C vinculada a IO para la cual el contenedor de Python liberará el GIL antes de la llamada a la función real.
¿Tengo que escribir mi propio grupo de subprocesos?
Acabo de descubrir que en realidad hay una interfaz Pool basada en subprocesos en el multiprocessing
módulo, sin embargo, está algo oculta y no está documentada adecuadamente.
Se puede importar mediante
from multiprocessing.pool import ThreadPool
Se implementa utilizando una clase de proceso ficticia que envuelve un hilo de Python. Esta clase de proceso basada en subprocesos se puede encontrar en multiprocessing.dummy
la que se menciona brevemente en los documentos . Este módulo ficticio supuestamente proporciona toda la interfaz de multiprocesamiento basada en subprocesos.
En Python 3 puedes usar concurrent.futures.ThreadPoolExecutor
, es decir:
executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)
Consulte los documentos para obtener más información y ejemplos.
Sí, y parece tener (más o menos) la misma API.
import multiprocessing
def worker(lnk):
....
def start_process():
.....
....
if(PROCESS):
pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE,
initializer=start_process)
pool.map(worker, inputs)
....
Para algo muy simple y liviano (ligeramente modificado desde aquí ):
from Queue import Queue
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception, e:
print e
finally:
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
if __name__ == '__main__':
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(100)]
def wait_delay(d):
print 'sleeping for (%d)sec' % d
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()
Para admitir devoluciones de llamada al finalizar la tarea, simplemente puede agregar la devolución de llamada a la tupla de tareas.