Spark SQL: aplica funciones agregadas a una lista de columnas

Resuelto lilloraffa asked hace 8 años • 4 respuestas

¿Hay alguna manera de aplicar una función agregada a todas (o a una lista de) columnas de un marco de datos al realizar un groupBy? En otras palabras, ¿hay alguna manera de evitar hacer esto para cada columna?

df.groupBy("col1")
  .agg(sum("col2").alias("col2"), sum("col3").alias("col3"), ...)
lilloraffa avatar Nov 24 '15 06:11 lilloraffa
Aceptado

Hay varias formas de aplicar funciones agregadas a varias columnas.

GroupedDataLa clase proporciona una serie de métodos para las funciones más comunes, incluidas count, max, y , que se pueden usar directamente de la siguiente manera min:meansum

  • Pitón:

    df = sqlContext.createDataFrame(
        [(1.0, 0.3, 1.0), (1.0, 0.5, 0.0), (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2)],
        ("col1", "col2", "col3"))
    
    df.groupBy("col1").sum()
    
    ## +----+---------+-----------------+---------+
    ## |col1|sum(col1)|        sum(col2)|sum(col3)|
    ## +----+---------+-----------------+---------+
    ## | 1.0|      2.0|              0.8|      1.0|
    ## |-1.0|     -2.0|6.199999999999999|      0.7|
    ## +----+---------+-----------------+---------+
    
  • escala

    val df = sc.parallelize(Seq(
      (1.0, 0.3, 1.0), (1.0, 0.5, 0.0),
      (-1.0, 0.6, 0.5), (-1.0, 5.6, 0.2))
    ).toDF("col1", "col2", "col3")
    
    df.groupBy($"col1").min().show
    
    // +----+---------+---------+---------+
    // |col1|min(col1)|min(col2)|min(col3)|
    // +----+---------+---------+---------+
    // | 1.0|      1.0|      0.3|      0.0|
    // |-1.0|     -1.0|      0.6|      0.2|
    // +----+---------+---------+---------+
    

Opcionalmente, puede pasar una lista de columnas que deben agregarse

df.groupBy("col1").sum("col2", "col3")

También puede pasar diccionario/mapa con columnas y claves y funciones como valores:

  • Pitón

    exprs = {x: "sum" for x in df.columns}
    df.groupBy("col1").agg(exprs).show()
    
    ## +----+---------+
    ## |col1|avg(col3)|
    ## +----+---------+
    ## | 1.0|      0.5|
    ## |-1.0|     0.35|
    ## +----+---------+
    
  • escala

    val exprs = df.columns.map((_ -> "mean")).toMap
    df.groupBy($"col1").agg(exprs).show()
    
    // +----+---------+------------------+---------+
    // |col1|avg(col1)|         avg(col2)|avg(col3)|
    // +----+---------+------------------+---------+
    // | 1.0|      1.0|               0.4|      0.5|
    // |-1.0|     -1.0|3.0999999999999996|     0.35|
    // +----+---------+------------------+---------+
    

Finalmente puedes usar varargs:

  • Pitón

    from pyspark.sql.functions import min
    
    exprs = [min(x) for x in df.columns]
    df.groupBy("col1").agg(*exprs).show()
    
  • escala

    import org.apache.spark.sql.functions.sum
    
    val exprs = df.columns.map(sum(_))
    df.groupBy($"col1").agg(exprs.head, exprs.tail: _*)
    

Hay otras formas de lograr un efecto similar, pero éstas deberían ser más que suficientes la mayor parte del tiempo.

Ver también:

  • Múltiples operaciones agregadas en la misma columna de un marco de datos Spark
zero323 avatar Nov 25 '2015 02:11 zero323

Otro ejemplo del mismo concepto, pero digamos: tiene 2 columnas diferentes y desea aplicar diferentes funciones agregadas a cada una de ellas, es decir

f.groupBy("col1").agg(sum("col2").alias("col2"), avg("col3").alias("col3"), ...)

Esta es la forma de lograrlo, aunque todavía no sé cómo agregar el alias en este caso.

Vea el siguiente ejemplo: Uso de mapas

val Claim1 = StructType(Seq(StructField("pid", StringType, true),StructField("diag1", StringType, true),StructField("diag2", StringType, true), StructField("allowed", IntegerType, true), StructField("allowed1", IntegerType, true)))
val claimsData1 = Seq(("PID1", "diag1", "diag2", 100, 200), ("PID1", "diag2", "diag3", 300, 600), ("PID1", "diag1", "diag5", 340, 680), ("PID2", "diag3", "diag4", 245, 490), ("PID2", "diag2", "diag1", 124, 248))

val claimRDD1 = sc.parallelize(claimsData1)
val claimRDDRow1 = claimRDD1.map(p => Row(p._1, p._2, p._3, p._4, p._5))
val claimRDD2DF1 = sqlContext.createDataFrame(claimRDDRow1, Claim1)

val l = List("allowed", "allowed1")
val exprs = l.map((_ -> "sum")).toMap
claimRDD2DF1.groupBy("pid").agg(exprs) show false
val exprs = Map("allowed" -> "sum", "allowed1" -> "avg")

claimRDD2DF1.groupBy("pid").agg(exprs) show false
Sumit Pal avatar Jul 07 '2016 19:07 Sumit Pal