¿Cómo definir la partición de DataFrame?
Comencé a usar Spark SQL y DataFrames en Spark 1.4.0. Quiero definir un particionador personalizado en DataFrames, en Scala, pero no veo cómo hacerlo.
Una de las tablas de datos con las que estoy trabajando contiene una lista de transacciones, por cuenta, similar al siguiente ejemplo.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Al menos inicialmente, la mayoría de los cálculos se realizarán entre las transacciones dentro de una cuenta. Por lo tanto, me gustaría particionar los datos para que todas las transacciones de una cuenta estén en la misma partición Spark.
Pero no veo una manera de definir esto. La clase DataFrame tiene un método llamado 'repartición (Int)', donde puede especificar la cantidad de particiones a crear. Pero no veo ningún método disponible para definir un particionador personalizado para un DataFrame, como el que se puede especificar para un RDD.
Los datos de origen se almacenan en Parquet. Vi que al escribir un DataFrame en Parquet, puede especificar una columna para particionar, por lo que presumiblemente podría decirle a Parquet que particione sus datos mediante la columna 'Cuenta'. Pero podría haber millones de cuentas y, si entiendo Parquet correctamente, crearía un directorio distinto para cada cuenta, por lo que no parecía una solución razonable.
¿Hay alguna manera de hacer que Spark particione este DataFrame para que todos los datos de una cuenta estén en la misma partición?
Chispa >= 2.3.0
SPARK-22614 expone la partición del rango.
val partitionedByRange = df.repartitionByRange(42, $"k")
partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
// +- LocalRelation [_1#2, _2#3]
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
//
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]
SPARK-22389 expone la partición de formato externo en la API de origen de datos v2 .
Chispa >= 1.6.0
En Spark >= 1.6 es posible utilizar la partición por columna para consultas y almacenamiento en caché. Ver: SPARK-11410 y SPARK-4849 usando repartition
el método:
val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")
val partitioned = df.repartition($"k")
partitioned.explain
// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- Scan PhysicalRDD[_1#5,_2#6]
A diferencia RDDs
de Spark Dataset
(incluido Dataset[Row]
también conocido como DataFrame
), no puede usar un particionador personalizado por ahora. Normalmente, puedes solucionar este problema creando una columna de partición artificial, pero no te dará la misma flexibilidad.
Chispa < 1.6.0:
Una cosa que puedes hacer es realizar una partición previa de los datos de entrada antes de crear unDataFrame
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner
val schema = StructType(Seq(
StructField("x", StringType, false),
StructField("y", LongType, false),
StructField("z", DoubleType, false)
))
val rdd = sc.parallelize(Seq(
Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))
val partitioner = new HashPartitioner(5)
val partitioned = rdd.map(r => (r.getString(0), r))
.partitionBy(partitioner)
.values
val df = sqlContext.createDataFrame(partitioned, schema)
Dado que DataFrame
la creación a partir de una RDD
requiere solo una fase de mapa simple, se debe conservar el diseño de la partición existente*:
assert(df.rdd.partitions == partitioned.partitions)
De la misma manera que puedes reparticionar lo existente DataFrame
:
sqlContext.createDataFrame(
df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
df.schema
)
Entonces parece que no es imposible. La pregunta sigue siendo si tiene algún sentido. Sostendré que la mayoría de las veces no es así:
La repartición es un proceso costoso. En un escenario típico, la mayoría de los datos deben serializarse, barajarse y deserializarse. Por otro lado, la cantidad de operaciones que pueden beneficiarse de datos preparticionados es relativamente pequeña y está aún más limitada si la API interna no está diseñada para aprovechar esta propiedad.
- se une en algunos escenarios, pero requeriría un apoyo interno,
- llamadas a funciones de ventana con particionador coincidente. Igual que el anterior, limitado a una definición de ventana única. Sin embargo, ya está particionado internamente, por lo que la partición previa puede ser redundante.
- agregaciones simples con
GROUP BY
: es posible reducir el uso de memoria de los buffers temporales**, pero el costo general es mucho mayor. Más o menos equivalente agroupByKey.mapValues(_.reduce)
(comportamiento actual) versusreduceByKey
(particionamiento previo). Es poco probable que sea útil en la práctica. - compresión de datos con
SqlContext.cacheTable
. Dado que parece que está utilizando codificación de longitud de ejecución, la aplicaciónOrderedRDDFunctions.repartitionAndSortWithinPartitions
podría mejorar la relación de compresión.
El rendimiento depende en gran medida de la distribución de las claves. Si está sesgado, el resultado será una utilización subóptima de los recursos. En el peor de los casos, será imposible terminar el trabajo.
- El objetivo de utilizar una API declarativa de alto nivel es aislarse de los detalles de implementación de bajo nivel. Como ya mencionaron @dwysakowicz y @RomiKuntsman, una optimización es un trabajo de Catalyst Optimizer . Es una bestia bastante sofisticada y realmente dudo que puedas mejorarla fácilmente sin profundizar mucho más en sus aspectos internos.
Conceptos relacionados
Partición con fuentes JDBC :
Las fuentes de datos JDBC respaldan predicates
el argumento . Se puede utilizar de la siguiente manera:
sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
Crea una única partición JDBC por predicado. Tenga en cuenta que si los conjuntos creados utilizando predicados individuales no están separados, verá duplicados en la tabla resultante.
partitionBy
método enDataFrameWriter
:
Spark DataFrameWriter
proporciona partitionBy
un método que se puede utilizar para "particionar" datos en escritura. Separa los datos al escribir utilizando el conjunto de columnas proporcionado.
val df = Seq(
("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")
df.write.partitionBy("k").json("/tmp/foo.json")
Esto permite presionar el predicado durante la lectura para consultas basadas en la clave:
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")
pero no es equivalente a DataFrame.repartition
. En particular agregaciones como:
val cnts = df1.groupBy($"k").sum()
todavía requerirá TungstenExchange
:
cnts.explain
// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
// +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
// +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
bucketBy
método enDataFrameWriter
(Spark >= 2.0):
bucketBy
Tiene aplicaciones similares partitionBy
pero está disponible solo para tablas ( saveAsTable
). La información de agrupación se puede utilizar para optimizar las uniones:
// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")
// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
// :- *Sort [k#41 ASC NULLS FIRST], false, 0
// : +- *Project [k#41, v#42]
// : +- *Filter isnotnull(k#41)
// : +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
// +- *Sort [k#46 ASC NULLS FIRST], false, 0
// +- *Project [k#46, v2#47]
// +- *Filter isnotnull(k#46)
// +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
* Por diseño de partición me refiero sólo a una distribución de datos. partitioned
RDD ya no tiene particionador. ** Suponiendo que no haya proyección temprana. Si la agregación cubre sólo un pequeño subconjunto de columnas, probablemente no se obtenga beneficio alguno.
En Spark <1.6 Si crea un HiveContext
, no el antiguo, SqlContext
puede usar HiveQL DISTRIBUTE BY colX...
(garantiza que cada uno de los N reductores obtenga rangos de x que no se superpongan) y CLUSTER BY colX...
(atajo para Distribuir por y Ordenar por), por ejemplo;
df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")
No estoy seguro de cómo encaja esto con la API de Spark DF. Estas palabras clave no son compatibles con el SqlContext normal (tenga en cuenta que no necesita tener un meta almacén de Hive para usar HiveContext)
EDITAR: Spark 1.6+ ahora tiene esto en la API nativa de DataFrame
Entonces, para comenzar con algún tipo de respuesta :) - No puedes
No soy un experto, pero hasta donde yo entiendo los DataFrames, no son iguales a rdd y DataFrame no tiene nada parecido a Partitioner.
Generalmente, la idea de DataFrame es proporcionar otro nivel de abstracción que maneje estos problemas por sí mismo. Las consultas en DataFrame se traducen en un plan lógico que luego se traduce en operaciones en RDD. La partición que sugirió probablemente se aplicará automáticamente o al menos debería serlo.
Si no confía en que SparkSQL proporcionará algún tipo de trabajo óptimo, siempre puede transformar DataFrame a RDD[Row] como se sugiere en los comentarios.
Pude hacer esto usando RDD. Pero no sé si esta es una solución aceptable para usted. Una vez que tenga el DF disponible como RDD, puede solicitar repartitionAndSortWithinPartitions
realizar una repartición personalizada de datos.
Aquí hay una muestra que utilicé:
class DatePartitioner(partitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = {
val start_time: Long = key.asInstanceOf[Long]
Objects.hash(Array(start_time)) % partitions
}
override def numPartitions: Int = partitions
}
myRDD
.repartitionAndSortWithinPartitions(new DatePartitioner(24))
.map { v => v._2 }
.toDF()
.write.mode(SaveMode.Overwrite)