¿Cómo ejecutar funciones en paralelo?
Estoy intentando ejecutar varias funciones en paralelo en Python.
Tengo algo como esto:
files.py
import common #common is a util class that handles all the IO stuff
dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
def func1():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir1)
c.getFiles(dir1)
time.sleep(10)
c.removeFiles(addFiles[i], dir1)
c.getFiles(dir1)
def func2():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir2)
c.getFiles(dir2)
time.sleep(10)
c.removeFiles(addFiles[i], dir2)
c.getFiles(dir2)
Quiero llamar a func1 y func2 y ejecutarlos al mismo tiempo. Las funciones no interactúan entre sí ni en el mismo objeto. En este momento tengo que esperar a que finalice func1 antes de que comience func2. ¿Cómo hago algo como lo siguiente?
process.py
from files import func1, func2
runBothFunc(func1(), func2())
Quiero poder crear ambos directorios casi al mismo tiempo porque cada minuto cuento cuántos archivos se crean. Si el directorio no está allí, me desviará del tiempo.
Podrías usar threading
o multiprocessing
.
Debido a las peculiaridades de CPython , threading
es poco probable que se logre un verdadero paralelismo. Por esta razón, multiprocessing
generalmente es una mejor apuesta.
Aquí tienes un ejemplo completo:
from multiprocessing import Process
def func1():
print("func1: starting")
for i in range(10000000):
pass
print("func1: finishing")
def func2():
print("func2: starting")
for i in range(10000000):
pass
print("func2: finishing")
if __name__ == "__main__":
p1 = Process(target=func1)
p1.start()
p2 = Process(target=func2)
p2.start()
p1.join()
p2.join()
La mecánica de iniciar/unir procesos secundarios se puede resumir fácilmente en una función similar a su runBothFunc
:
def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()
runInParallel(func1, func2)
Si sus funciones realizan principalmente trabajo de E/S (y menos trabajo de CPU) y tiene Python 3.2+, puede usar ThreadPoolExecutor :
from concurrent.futures import ThreadPoolExecutor
def run_io_tasks_in_parallel(tasks):
with ThreadPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
running_task.result()
run_io_tasks_in_parallel([
lambda: print('IO task 1 running!'),
lambda: print('IO task 2 running!'),
])
Si sus funciones realizan principalmente trabajo de CPU (y menos trabajo de E/S) y tiene Python 3.2+, puede usar ProcessPoolExecutor :
from concurrent.futures import ProcessPoolExecutor
def run_cpu_tasks_in_parallel(tasks):
with ProcessPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
running_task.result()
def task_1():
print('CPU task 1 running!')
def task_2():
print('CPU task 2 running!')
if __name__ == '__main__':
run_cpu_tasks_in_parallel([
task_1,
task_2,
])
Alternativamente, si solo tienes Python 2.6+, puedes usar el módulo de multiprocesamiento directamente:
from multiprocessing import Process
def run_cpu_tasks_in_parallel(tasks):
running_tasks = [Process(target=task) for task in tasks]
for running_task in running_tasks:
running_task.start()
for running_task in running_tasks:
running_task.join()
def task_1():
print('CPU task 1 running!')
def task_2():
print('CPU task 2 running!')
if __name__ == '__main__':
run_cpu_tasks_in_parallel([
task_1,
task_2,
])
Esto se puede hacer elegantemente con Ray , un sistema que le permite paralelizar y distribuir fácilmente su código Python.
Para paralelizar su ejemplo, necesitaría definir sus funciones con el @ray.remote
decorador y luego invocarlas con .remote
.
import ray
ray.init()
dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
# Define the functions.
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
# func1() code here...
@ray.remote
def func2(filename, addFiles, dir):
# func2() code here...
# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)])
Si pasa el mismo argumento a ambas funciones y el argumento es grande, una forma más eficiente de hacerlo es usar ray.put()
. Esto evita que el argumento grande se serialice dos veces y se creen dos copias en memoria del mismo:
largeData_id = ray.put(largeData)
ray.get([func1(largeData_id), func2(largeData_id)])
Importante : si func1()
devuelve func2()
resultados, debe reescribir el código de la siguiente manera:
ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])
Hay una serie de ventajas al utilizar Ray sobre el módulo de multiprocesamiento . En particular, el mismo código se ejecutará tanto en una sola máquina como en un grupo de máquinas. Para conocer más ventajas de Ray, consulte esta publicación relacionada .