Combine Pool.map con Array de memoria compartida en multiprocesamiento de Python
Tengo una gran variedad de datos (de solo lectura) que quiero que varios procesos procesen en paralelo.
Me gusta la Pool.map
función y me gustaría usarla para calcular funciones con esos datos en paralelo.
Vi que se puede usar la clase Value
o Array
para usar datos de memoria compartida entre procesos. Pero cuando intento usar esto, aparece un mensaje RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance
cuando uso la función Pool.map:
Aquí hay un ejemplo simplificado de lo que estoy tratando de hacer:
from sys import stdin
from multiprocessing import Pool, Array
def count_it( arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
return count
if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array('c', testData)
# this works
print count_it( toShare, "a" )
pool = Pool()
# RuntimeError here
print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )
¿Alguien puede decirme qué estoy haciendo mal aquí?
Entonces, lo que me gustaría hacer es pasar información sobre una matriz asignada de memoria compartida recién creada a los procesos después de que se hayan creado en el grupo de procesos.
Lo intenté de nuevo porque acabo de ver la recompensa;)
Básicamente, creo que el mensaje de error significa lo que dice: memoria compartida multiprocesamiento. Las matrices no se pueden pasar como argumentos (mediante decapado). No tiene sentido serializar los datos; el punto es que los datos son memoria compartida. Por lo tanto, debe hacer que la matriz compartida sea global. Creo que es mejor ponerlo como atributo de un módulo, como en mi primera respuesta, pero dejarlo como una variable global en su ejemplo también funciona bien. Teniendo en cuenta su punto de no querer establecer los datos antes de la bifurcación, aquí hay un ejemplo modificado. Si quisiera tener más de una posible matriz compartida (y es por eso que quería pasar toShare como argumento), podría hacer de manera similar una lista global de matrices compartidas y simplemente pasar el índice a count_it (que se convertiría en for c in toShare[i]:
).
from sys import stdin
from multiprocessing import Pool, Array, Process
def count_it( key ):
count = 0
for c in toShare:
if c == key:
count += 1
return count
if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)
# fork
pool = Pool()
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
[EDITAR: Lo anterior no funciona en Windows porque no se usa fork. Sin embargo, lo siguiente funciona en Windows, aún usando Pool, así que creo que esto es lo más cercano a lo que desea:
from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule
def count_it( key ):
count = 0
for c in mymodule.toShare:
if c == key:
count += 1
return count
def initProcess(share):
mymodule.toShare = share
if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)
# fork
pool = Pool(initializer=initProcess,initargs=(toShare,))
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
No estoy seguro de por qué el mapa no selecciona la matriz, pero el proceso y el grupo sí lo hacen; creo que tal vez se haya transferido en el punto de inicialización del subproceso en Windows. Sin embargo, tenga en cuenta que los datos todavía se configuran después de la bifurcación.
Si estás viendo:
RuntimeError: los objetos sincronizados solo deben compartirse entre procesos mediante herencia
Considere su uso multiprocessing.Manager
ya que no tiene esta limitación. El administrador trabaja considerando que presumiblemente se ejecuta en un proceso completamente separado.
import ctypes
import multiprocessing
# Put this in a method or function, otherwise it will run on import from each module:
manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock() # pylint: disable=no-member
with counter_lock:
counter.value = count = counter.value + 1
Si los datos son de solo lectura, conviértalos en una variable en un módulo antes de la bifurcación de Pool. Entonces todos los procesos secundarios deberían poder acceder a él y no se copiará siempre que no le escriba.
import myglobals # anything (empty .py file)
myglobals.data = []
def count_it( key ):
count = 0
for c in myglobals.data:
if c == key:
count += 1
return count
if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )
Si desea intentar utilizar Array, puede probar con el lock=False
argumento de palabra clave (es verdadero de forma predeterminada).
El problema que veo es que Pool no admite el decapado de datos compartidos a través de su lista de argumentos. Eso es lo que significa el mensaje de error cuando dice "los objetos sólo deben compartirse entre procesos mediante herencia". Los datos compartidos deben ser heredados, es decir, globales si desea compartirlos mediante la clase Pool.
Si necesita pasarlos explícitamente, es posible que deba utilizar multiprocesamiento.Proceso. Aquí está su ejemplo reelaborado:
from multiprocessing import Process, Array, Queue
def count_it( q, arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
q.put((key, count))
if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array('c', testData)
q = Queue()
keys = ['a', 'b', 's', 'd']
workers = [Process(target=count_it, args = (q, toShare, key))
for key in keys]
for p in workers:
p.start()
for p in workers:
p.join()
while not q.empty():
print q.get(),
Salida: ('s', 9) ('a', 2) ('b', 3) ('d', 12)
El orden de los elementos de la cola puede variar.
Para hacer esto más genérico y similar a Pool, puede crear un número fijo de N procesos, dividir la lista de claves en N partes y luego usar una función contenedora como objetivo del proceso, que llamará a count_it para cada clave de la lista. se pasa, como:
def wrapper( q, arr, keys ):
for k in keys:
count_it(q, arr, k)