Combine Pool.map con Array de memoria compartida en multiprocesamiento de Python

Resuelto Jeroen Dirks asked hace 15 años • 4 respuestas

Tengo una gran variedad de datos (de solo lectura) que quiero que varios procesos procesen en paralelo.

Me gusta la Pool.mapfunción y me gustaría usarla para calcular funciones con esos datos en paralelo.

Vi que se puede usar la clase Valueo Arraypara usar datos de memoria compartida entre procesos. Pero cuando intento usar esto, aparece un mensaje RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritancecuando uso la función Pool.map:

Aquí hay un ejemplo simplificado de lo que estoy tratando de hacer:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

¿Alguien puede decirme qué estoy haciendo mal aquí?

Entonces, lo que me gustaría hacer es pasar información sobre una matriz asignada de memoria compartida recién creada a los procesos después de que se hayan creado en el grupo de procesos.

Jeroen Dirks avatar Nov 05 '09 01:11 Jeroen Dirks
Aceptado

Lo intenté de nuevo porque acabo de ver la recompensa;)

Básicamente, creo que el mensaje de error significa lo que dice: memoria compartida multiprocesamiento. Las matrices no se pueden pasar como argumentos (mediante decapado). No tiene sentido serializar los datos; el punto es que los datos son memoria compartida. Por lo tanto, debe hacer que la matriz compartida sea global. Creo que es mejor ponerlo como atributo de un módulo, como en mi primera respuesta, pero dejarlo como una variable global en su ejemplo también funciona bien. Teniendo en cuenta su punto de no querer establecer los datos antes de la bifurcación, aquí hay un ejemplo modificado. Si quisiera tener más de una posible matriz compartida (y es por eso que quería pasar toShare como argumento), podría hacer de manera similar una lista global de matrices compartidas y simplemente pasar el índice a count_it (que se convertiría en for c in toShare[i]:).

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[EDITAR: Lo anterior no funciona en Windows porque no se usa fork. Sin embargo, lo siguiente funciona en Windows, aún usando Pool, así que creo que esto es lo más cercano a lo que desea:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

No estoy seguro de por qué el mapa no selecciona la matriz, pero el proceso y el grupo sí lo hacen; creo que tal vez se haya transferido en el punto de inicialización del subproceso en Windows. Sin embargo, tenga en cuenta que los datos todavía se configuran después de la bifurcación.

robince avatar Nov 12 '2009 12:11 robince

Si estás viendo:

RuntimeError: los objetos sincronizados solo deben compartirse entre procesos mediante herencia

Considere su uso multiprocessing.Managerya que no tiene esta limitación. El administrador trabaja considerando que presumiblemente se ejecuta en un proceso completamente separado.

import ctypes
import multiprocessing

# Put this in a method or function, otherwise it will run on import from each module:
manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock()  # pylint: disable=no-member

with counter_lock:
    counter.value = count = counter.value + 1
Asclepius avatar Oct 02 '2019 20:10 Asclepius

Si los datos son de solo lectura, conviértalos en una variable en un módulo antes de la bifurcación de Pool. Entonces todos los procesos secundarios deberían poder acceder a él y no se copiará siempre que no le escriba.

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

Si desea intentar utilizar Array, puede probar con el lock=Falseargumento de palabra clave (es verdadero de forma predeterminada).

robince avatar Nov 04 '2009 20:11 robince

El problema que veo es que Pool no admite el decapado de datos compartidos a través de su lista de argumentos. Eso es lo que significa el mensaje de error cuando dice "los objetos sólo deben compartirse entre procesos mediante herencia". Los datos compartidos deben ser heredados, es decir, globales si desea compartirlos mediante la clase Pool.

Si necesita pasarlos explícitamente, es posible que deba utilizar multiprocesamiento.Proceso. Aquí está su ejemplo reelaborado:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

Salida: ('s', 9) ('a', 2) ('b', 3) ('d', 12)

El orden de los elementos de la cola puede variar.

Para hacer esto más genérico y similar a Pool, puede crear un número fijo de N procesos, dividir la lista de claves en N partes y luego usar una función contenedora como objetivo del proceso, que llamará a count_it para cada clave de la lista. se pasa, como:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)
jwilson avatar Nov 10 '2009 02:11 jwilson