¿Cómo divido un RDD en dos o más RDD?

Resuelto Carlos Bribiescas asked hace 8 años • 4 respuestas

Estoy buscando una manera de dividir un RDD en dos o más RDD. Lo más cercano que he visto es Scala Spark: ¿Dividir la colección en varios RDD? que sigue siendo un único RDD.

Si está familiarizado con SAS, algo como esto:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;

lo que resultó en dos conjuntos de datos distintos. Habría que persistir inmediatamente para obtener los resultados que pretendo...

Carlos Bribiescas avatar Oct 06 '15 20:10 Carlos Bribiescas
Aceptado

No es posible generar varios RDD a partir de una única transformación*. Si desea dividir un RDD, debe aplicar una filterpara cada condición de división. Por ejemplo:

def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

Si sólo tienes una condición binaria y el cálculo es caro, quizás prefieras algo como esto:

kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()

rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()

Significa solo un cálculo de predicado único, pero requiere un paso adicional sobre todos los datos.

Es importante tener en cuenta que siempre que un RDD de entrada esté almacenado en caché correctamente y no haya suposiciones adicionales con respecto a la distribución de datos, no existe una diferencia significativa en lo que respecta a la complejidad del tiempo entre el filtro repetido y el bucle for con if-else anidado.

Con N elementos y M condiciones, el número de operaciones que debe realizar es claramente proporcional a N veces M. En el caso de un bucle for, debería estar más cerca de (N + MN) / 2 y el filtro repetido es exactamente NM, pero al final de el día no es más que O(NM). Puede ver mi discusión** con Jason Lenderman para leer sobre algunos pros y contras.

En el nivel muy alto debes considerar dos cosas:

  1. Las transformaciones de Spark son perezosas, hasta que ejecutas una acción tu RDD no se materializa

    ¿Por qué eso importa? Volviendo a mi ejemplo:

     rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
    

    Si luego decido que lo necesito sólo rdd_oddentonces no hay motivo para materializarlo rdd_even.

    Si observa su ejemplo de SAS para calcular, work.split2necesita materializar tanto los datos de entrada como los archivos work.split1.

  2. Los RDD proporcionan una API declarativa. Cuándo usa filtero mapdepende completamente del motor Spark cómo se realiza esta operación. Siempre que las funciones pasadas a las transformaciones estén libres de efectos secundarios, se crean múltiples posibilidades para optimizar todo un proceso.

Al fin y al cabo, este caso no es lo suficientemente especial como para justificar su propia transformación.

Este mapa con patrón de filtro en realidad se usa en un Spark central. Vea mi respuesta a ¿Cómo Sparks RDD.randomSplit realmente divide el RDD y una parte relevante del randomSplitmétodo?

Si el único objetivo es lograr una división en la entrada, es posible utilizar partitionByla cláusula para DataFrameWriterel formato de salida de texto:

def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)

* Sólo hay 3 tipos básicos de transformaciones en Spark:

  • RDD[T] => RDD[T]
  • RDD[T] => RDD[U]
  • (RDD[T], RDD[U]) => RDD[W]

donde T, U, W pueden ser tipos atómicos o productos /tuplas (K, V). Cualquier otra operación debe expresarse utilizando alguna combinación de las anteriores. Puede consultar el documento RDD original para obtener más detalles.

** https://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

*** Véase también Scala Spark: ¿Dividir la colección en varios RDD?

zero323 avatar Oct 06 '2015 13:10 zero323

Como otros carteles mencionaron anteriormente, no existe una transformación RDD única y nativa que divida los RDD, pero aquí hay algunas operaciones "multiplex" que pueden emular eficientemente una amplia variedad de "división" en RDD, sin leer varias veces:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions

Algunos métodos específicos para la división aleatoria:

http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions

Los métodos están disponibles en el proyecto sílex de código abierto:

https://github.com/willb/silex

Una publicación de blog que explica cómo funcionan:

http://erikerlandson.github.io/blog/2016/02/08/ficient-multiplexing-for-spark-rdds/

def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}

def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}

Como se mencionó en otra parte, estos métodos implican un equilibrio entre memoria y velocidad, porque operan calculando los resultados de la partición completa "con entusiasmo" en lugar de "perezosamente". Por lo tanto, es posible que estos métodos tengan problemas de memoria en particiones grandes, donde las transformaciones diferidas más tradicionales no lo harán.

eje avatar Jun 21 '2016 23:06 eje