¿Cómo ejecutar funciones en paralelo?

Resuelto lmcadory asked hace 13 años • 9 respuestas

Estoy intentando ejecutar varias funciones en paralelo en Python.

Tengo algo como esto:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

Quiero llamar a func1 y func2 y ejecutarlos al mismo tiempo. Las funciones no interactúan entre sí ni en el mismo objeto. En este momento tengo que esperar a que finalice func1 antes de que comience func2. ¿Cómo hago algo como lo siguiente?

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Quiero poder crear ambos directorios casi al mismo tiempo porque cada minuto cuento cuántos archivos se crean. Si el directorio no está allí, me desviará del tiempo.

lmcadory avatar Aug 26 '11 22:08 lmcadory
Aceptado

Podrías usar threadingo multiprocessing.

Debido a las peculiaridades de CPython , threadinges poco probable que se logre un verdadero paralelismo. Por esta razón, multiprocessinggeneralmente es una mejor apuesta.

Aquí tienes un ejemplo completo:

from multiprocessing import Process


def func1():
    print("func1: starting")
    for i in range(10000000):
        pass

    print("func1: finishing")


def func2():
    print("func2: starting")
    for i in range(10000000):
        pass

    print("func2: finishing")


if __name__ == "__main__":
    p1 = Process(target=func1)
    p1.start()
    p2 = Process(target=func2)
    p2.start()
    p1.join()
    p2.join()

La mecánica de iniciar/unir procesos secundarios se puede resumir fácilmente en una función similar a su runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)
NPE avatar Aug 26 '2011 15:08 NPE

Si sus funciones realizan principalmente trabajo de E/S (y menos trabajo de CPU) y tiene Python 3.2+, puede usar ThreadPoolExecutor :

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

Si sus funciones realizan principalmente trabajo de CPU (y menos trabajo de E/S) y tiene Python 3.2+, puede usar ProcessPoolExecutor :

from concurrent.futures import ProcessPoolExecutor

def run_cpu_tasks_in_parallel(tasks):
    with ProcessPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

def task_1():
    print('CPU task 1 running!')

def task_2():
    print('CPU task 2 running!')

if __name__ == '__main__':
    run_cpu_tasks_in_parallel([
        task_1,
        task_2,
    ])

Alternativamente, si solo tienes Python 2.6+, puedes usar el módulo de multiprocesamiento directamente:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

def task_1():
    print('CPU task 1 running!')

def task_2():
    print('CPU task 2 running!')

if __name__ == '__main__':
    run_cpu_tasks_in_parallel([
        task_1,
        task_2,
    ])
David Foster avatar May 14 '2019 21:05 David Foster

Esto se puede hacer elegantemente con Ray , un sistema que le permite paralelizar y distribuir fácilmente su código Python.

Para paralelizar su ejemplo, necesitaría definir sus funciones con el @ray.remotedecorador y luego invocarlas con .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

Si pasa el mismo argumento a ambas funciones y el argumento es grande, una forma más eficiente de hacerlo es usar ray.put(). Esto evita que el argumento grande se serialice dos veces y se creen dos copias en memoria del mismo:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Importante : si func1()devuelve func2()resultados, debe reescribir el código de la siguiente manera:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

Hay una serie de ventajas al utilizar Ray sobre el módulo de multiprocesamiento . En particular, el mismo código se ejecutará tanto en una sola máquina como en un grupo de máquinas. Para conocer más ventajas de Ray, consulte esta publicación relacionada .

Ion Stoica avatar Feb 03 '2019 19:02 Ion Stoica