¿Grupo de subprocesos similar al grupo de multiprocesamiento?

Resuelto Martin asked hace 14 años • 11 respuestas

¿Existe una clase Pool para subprocesos de trabajo , similar a la clase Pool del módulo multiprocesamiento ?

Me gusta, por ejemplo, la manera fácil de paralelizar una función de mapa.

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

sin embargo, me gustaría hacerlo sin la sobrecarga de crear nuevos procesos.

Sé sobre el GIL. Sin embargo, en mi caso de uso, la función será una función C vinculada a IO para la cual el contenedor de Python liberará el GIL antes de la llamada a la función real.

¿Tengo que escribir mi propio grupo de subprocesos?

Martin avatar Jun 14 '10 04:06 Martin
Aceptado

Acabo de descubrir que en realidad hay una interfaz Pool basada en subprocesos en el multiprocessingmódulo, sin embargo, está algo oculta y no está documentada adecuadamente.

Se puede importar mediante

from multiprocessing.pool import ThreadPool

Se implementa utilizando una clase de proceso ficticia que envuelve un hilo de Python. Esta clase de proceso basada en subprocesos se puede encontrar en multiprocessing.dummyla que se menciona brevemente en los documentos . Este módulo ficticio supuestamente proporciona toda la interfaz de multiprocesamiento basada en subprocesos.

Martin avatar Aug 02 '2010 09:08 Martin

En Python 3 puedes usar concurrent.futures.ThreadPoolExecutor, es decir:

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

Consulte los documentos para obtener más información y ejemplos.

Adrian Adamiak avatar Jul 17 '2012 19:07 Adrian Adamiak

Sí, y parece tener (más o menos) la misma API.

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....
warfares avatar Nov 30 '2012 19:11 warfares

Para algo muy simple y liviano (ligeramente modificado desde aquí ):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

Para admitir devoluciones de llamada al finalizar la tarea, simplemente puede agregar la devolución de llamada a la tupla de tareas.

dgorissen avatar Aug 31 '2011 13:08 dgorissen