¿Cómo definir la partición de DataFrame?

Resuelto rake asked hace 9 años • 5 respuestas

Comencé a usar Spark SQL y DataFrames en Spark 1.4.0. Quiero definir un particionador personalizado en DataFrames, en Scala, pero no veo cómo hacerlo.

Una de las tablas de datos con las que estoy trabajando contiene una lista de transacciones, por cuenta, similar al siguiente ejemplo.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

Al menos inicialmente, la mayoría de los cálculos se realizarán entre las transacciones dentro de una cuenta. Por lo tanto, me gustaría particionar los datos para que todas las transacciones de una cuenta estén en la misma partición Spark.

Pero no veo una manera de definir esto. La clase DataFrame tiene un método llamado 'repartición (Int)', donde puede especificar la cantidad de particiones a crear. Pero no veo ningún método disponible para definir un particionador personalizado para un DataFrame, como el que se puede especificar para un RDD.

Los datos de origen se almacenan en Parquet. Vi que al escribir un DataFrame en Parquet, puede especificar una columna para particionar, por lo que presumiblemente podría decirle a Parquet que particione sus datos mediante la columna 'Cuenta'. Pero podría haber millones de cuentas y, si entiendo Parquet correctamente, crearía un directorio distinto para cada cuenta, por lo que no parecía una solución razonable.

¿Hay alguna manera de hacer que Spark particione este DataFrame para que todos los datos de una cuenta estén en la misma partición?

rake avatar Jun 23 '15 13:06 rake
Aceptado

Chispa >= 2.3.0

SPARK-22614 expone la partición del rango.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 expone la partición de formato externo en la API de origen de datos v2 .

Chispa >= 1.6.0

En Spark >= 1.6 es posible utilizar la partición por columna para consultas y almacenamiento en caché. Ver: SPARK-11410 y SPARK-4849 usando repartitionel método:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

A diferencia RDDsde Spark Dataset(incluido Dataset[Row]también conocido como DataFrame), no puede usar un particionador personalizado por ahora. Normalmente, puedes solucionar este problema creando una columna de partición artificial, pero no te dará la misma flexibilidad.

Chispa < 1.6.0:

Una cosa que puedes hacer es realizar una partición previa de los datos de entrada antes de crear unDataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Dado que DataFramela creación a partir de una RDDrequiere solo una fase de mapa simple, se debe conservar el diseño de la partición existente*:

assert(df.rdd.partitions == partitioned.partitions)

De la misma manera que puedes reparticionar lo existente DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

Entonces parece que no es imposible. La pregunta sigue siendo si tiene algún sentido. Sostendré que la mayoría de las veces no es así:

  1. La repartición es un proceso costoso. En un escenario típico, la mayoría de los datos deben serializarse, barajarse y deserializarse. Por otro lado, la cantidad de operaciones que pueden beneficiarse de datos preparticionados es relativamente pequeña y está aún más limitada si la API interna no está diseñada para aprovechar esta propiedad.

    • se une en algunos escenarios, pero requeriría un apoyo interno,
    • llamadas a funciones de ventana con particionador coincidente. Igual que el anterior, limitado a una definición de ventana única. Sin embargo, ya está particionado internamente, por lo que la partición previa puede ser redundante.
    • agregaciones simples con GROUP BY: es posible reducir el uso de memoria de los buffers temporales**, pero el costo general es mucho mayor. Más o menos equivalente a groupByKey.mapValues(_.reduce)(comportamiento actual) versus reduceByKey(particionamiento previo). Es poco probable que sea útil en la práctica.
    • compresión de datos con SqlContext.cacheTable. Dado que parece que está utilizando codificación de longitud de ejecución, la aplicación OrderedRDDFunctions.repartitionAndSortWithinPartitionspodría mejorar la relación de compresión.
  2. El rendimiento depende en gran medida de la distribución de las claves. Si está sesgado, el resultado será una utilización subóptima de los recursos. En el peor de los casos, será imposible terminar el trabajo.

  3. El objetivo de utilizar una API declarativa de alto nivel es aislarse de los detalles de implementación de bajo nivel. Como ya mencionaron @dwysakowicz y @RomiKuntsman, una optimización es un trabajo de Catalyst Optimizer . Es una bestia bastante sofisticada y realmente dudo que puedas mejorarla fácilmente sin profundizar mucho más en sus aspectos internos.

Conceptos relacionados

Partición con fuentes JDBC :

Las fuentes de datos JDBC respaldan predicatesel argumento . Se puede utilizar de la siguiente manera:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

Crea una única partición JDBC por predicado. Tenga en cuenta que si los conjuntos creados utilizando predicados individuales no están separados, verá duplicados en la tabla resultante.

partitionBymétodo enDataFrameWriter :

Spark DataFrameWriterproporciona partitionByun método que se puede utilizar para "particionar" datos en escritura. Separa los datos al escribir utilizando el conjunto de columnas proporcionado.

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

Esto permite presionar el predicado durante la lectura para consultas basadas en la clave:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

pero no es equivalente a DataFrame.repartition. En particular agregaciones como:

val cnts = df1.groupBy($"k").sum()

todavía requerirá TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBymétodo enDataFrameWriter (Spark >= 2.0):

bucketByTiene aplicaciones similares partitionBypero está disponible solo para tablas ( saveAsTable). La información de agrupación se puede utilizar para optimizar las uniones:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* Por diseño de partición me refiero sólo a una distribución de datos. partitionedRDD ya no tiene particionador. ** Suponiendo que no haya proyección temprana. Si la agregación cubre sólo un pequeño subconjunto de columnas, probablemente no se obtenga beneficio alguno.

zero323 avatar Oct 03 '2015 07:10 zero323

En Spark <1.6 Si crea un HiveContext, no el antiguo, SqlContextpuede usar HiveQL DISTRIBUTE BY colX... (garantiza que cada uno de los N reductores obtenga rangos de x que no se superpongan) y CLUSTER BY colX...(atajo para Distribuir por y Ordenar por), por ejemplo;

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

No estoy seguro de cómo encaja esto con la API de Spark DF. Estas palabras clave no son compatibles con el SqlContext normal (tenga en cuenta que no necesita tener un meta almacén de Hive para usar HiveContext)

EDITAR: Spark 1.6+ ahora tiene esto en la API nativa de DataFrame

NightWolf avatar Aug 10 '2015 04:08 NightWolf

Entonces, para comenzar con algún tipo de respuesta :) - No puedes

No soy un experto, pero hasta donde yo entiendo los DataFrames, no son iguales a rdd y DataFrame no tiene nada parecido a Partitioner.

Generalmente, la idea de DataFrame es proporcionar otro nivel de abstracción que maneje estos problemas por sí mismo. Las consultas en DataFrame se traducen en un plan lógico que luego se traduce en operaciones en RDD. La partición que sugirió probablemente se aplicará automáticamente o al menos debería serlo.

Si no confía en que SparkSQL proporcionará algún tipo de trabajo óptimo, siempre puede transformar DataFrame a RDD[Row] como se sugiere en los comentarios.

Dawid Wysakowicz avatar Sep 29 '2015 20:09 Dawid Wysakowicz

Pude hacer esto usando RDD. Pero no sé si esta es una solución aceptable para usted. Una vez que tenga el DF disponible como RDD, puede solicitar repartitionAndSortWithinPartitionsrealizar una repartición personalizada de datos.

Aquí hay una muestra que utilicé:

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)
Developer avatar Oct 02 '2015 17:10 Developer