¿Cómo divido un RDD en dos o más RDD?
Estoy buscando una manera de dividir un RDD en dos o más RDD. Lo más cercano que he visto es Scala Spark: ¿Dividir la colección en varios RDD? que sigue siendo un único RDD.
Si está familiarizado con SAS, algo como esto:
data work.split1, work.split2;
set work.preSplit;
if (condition1)
output work.split1
else if (condition2)
output work.split2
run;
lo que resultó en dos conjuntos de datos distintos. Habría que persistir inmediatamente para obtener los resultados que pretendo...
No es posible generar varios RDD a partir de una única transformación*. Si desea dividir un RDD, debe aplicar una filter
para cada condición de división. Por ejemplo:
def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
Si sólo tienes una condición binaria y el cálculo es caro, quizás prefieras algo como esto:
kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()
rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
Significa solo un cálculo de predicado único, pero requiere un paso adicional sobre todos los datos.
Es importante tener en cuenta que siempre que un RDD de entrada esté almacenado en caché correctamente y no haya suposiciones adicionales con respecto a la distribución de datos, no existe una diferencia significativa en lo que respecta a la complejidad del tiempo entre el filtro repetido y el bucle for con if-else anidado.
Con N elementos y M condiciones, el número de operaciones que debe realizar es claramente proporcional a N veces M. En el caso de un bucle for, debería estar más cerca de (N + MN) / 2 y el filtro repetido es exactamente NM, pero al final de el día no es más que O(NM). Puede ver mi discusión** con Jason Lenderman para leer sobre algunos pros y contras.
En el nivel muy alto debes considerar dos cosas:
Las transformaciones de Spark son perezosas, hasta que ejecutas una acción tu RDD no se materializa
¿Por qué eso importa? Volviendo a mi ejemplo:
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
Si luego decido que lo necesito sólo
rdd_odd
entonces no hay motivo para materializarlordd_even
.Si observa su ejemplo de SAS para calcular,
work.split2
necesita materializar tanto los datos de entrada como los archivoswork.split1
.Los RDD proporcionan una API declarativa. Cuándo usa
filter
omap
depende completamente del motor Spark cómo se realiza esta operación. Siempre que las funciones pasadas a las transformaciones estén libres de efectos secundarios, se crean múltiples posibilidades para optimizar todo un proceso.
Al fin y al cabo, este caso no es lo suficientemente especial como para justificar su propia transformación.
Este mapa con patrón de filtro en realidad se usa en un Spark central. Vea mi respuesta a ¿Cómo Sparks RDD.randomSplit realmente divide el RDD y una parte relevante del randomSplit
método?
Si el único objetivo es lograr una división en la entrada, es posible utilizar partitionBy
la cláusula para DataFrameWriter
el formato de salida de texto:
def makePairs(row: T): (String, String) = ???
data
.map(makePairs).toDF("key", "value")
.write.partitionBy($"key").format("text").save(...)
* Sólo hay 3 tipos básicos de transformaciones en Spark:
- RDD[T] => RDD[T]
- RDD[T] => RDD[U]
- (RDD[T], RDD[U]) => RDD[W]
donde T, U, W pueden ser tipos atómicos o productos /tuplas (K, V). Cualquier otra operación debe expresarse utilizando alguna combinación de las anteriores. Puede consultar el documento RDD original para obtener más detalles.
** https://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman
*** Véase también Scala Spark: ¿Dividir la colección en varios RDD?
Como otros carteles mencionaron anteriormente, no existe una transformación RDD única y nativa que divida los RDD, pero aquí hay algunas operaciones "multiplex" que pueden emular eficientemente una amplia variedad de "división" en RDD, sin leer varias veces:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions
Algunos métodos específicos para la división aleatoria:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions
Los métodos están disponibles en el proyecto sílex de código abierto:
https://github.com/willb/silex
Una publicación de blog que explica cómo funcionan:
http://erikerlandson.github.io/blog/2016/02/08/ficient-multiplexing-for-spark-rdds/
def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}
def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}
Como se mencionó en otra parte, estos métodos implican un equilibrio entre memoria y velocidad, porque operan calculando los resultados de la partición completa "con entusiasmo" en lugar de "perezosamente". Por lo tanto, es posible que estos métodos tengan problemas de memoria en particiones grandes, donde las transformaciones diferidas más tradicionales no lo harán.