¿Funciones Spark versus rendimiento UDF?
Spark ahora ofrece funciones predefinidas que se pueden usar en marcos de datos y parece que están altamente optimizadas. Mi pregunta original iba a ser cuál es más rápido, pero hice algunas pruebas y descubrí que las funciones Spark son aproximadamente 10 veces más rápidas al menos en un caso. ¿Alguien sabe por qué esto es así y cuándo sería más rápido un udf (solo en los casos en que existe una función de chispa idéntica)?
Aquí está mi código de prueba (ejecutado en la educación comunitaria de Databricks):
# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
name = fake.name().split()
return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)
# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
for _ in xrange(times):
yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print len(data)
data[0]
dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))
dataDF.cache()
Función UDF:
concat_s = udf(lambda s: s+ 's')
udfData = dataDF.select(concat_s(dataDF.first_name).alias('name'))
udfData.count()
Función de chispa:
spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name'))
spfData.count()
Se ejecutaron ambas veces varias veces, la udf normalmente tardaba entre 1,1 y 1,4 s y la concat
función Spark siempre tardaba menos de 0,15 s.
¿Cuándo sería más rápido un UDF?
Si preguntas sobre Python UDF, la respuesta probablemente sea nunca*. Dado que las funciones SQL son relativamente simples y no están diseñadas para tareas complejas, es prácticamente imposible compensar el costo de la serialización, deserialización y movimiento de datos repetidos entre el intérprete de Python y la JVM.
¿Alguien sabe por qué esto es así?
Las razones principales ya se enumeraron anteriormente y se pueden reducir al simple hecho de que Spark DataFrame
es de forma nativa una estructura JVM y los métodos de acceso estándar se implementan mediante simples llamadas a la API de Java. Por otro lado, las UDF se implementan en Python y requieren mover datos de un lado a otro.
Si bien PySpark en general requiere movimientos de datos entre JVM y Python, en el caso de una API RDD de bajo nivel, generalmente no requiere una actividad de servicio costosa. Spark SQL agrega costos adicionales de serialización y serialización, así como el costo de mover datos desde y hacia una representación insegura en JVM. El último es específico de todas las UDF (Python, Scala y Java), pero el primero es específico de lenguajes no nativos.
A diferencia de las UDF, las funciones de Spark SQL operan directamente en JVM y normalmente están bien integradas tanto con Catalyst como con Tungsten. Significa que estos se pueden optimizar en el plan de ejecución y la mayoría de las veces pueden beneficiarse de coden y otras optimizaciones de Tungsteno. Además, estos pueden operar con datos en su representación "nativa".
Entonces, en cierto sentido, el problema aquí es que Python UDF tiene que traer datos al código, mientras que las expresiones SQL van al revés.
* Según estimaciones aproximadas, la ventana UDF de PySpark puede superar la función de ventana de Scala.
Después de años, cuando tuve un conocimiento más profundo y revisé la pregunta, me di cuenta de lo que @alfredox realmente quiere preguntar. Así que revisé nuevamente y dividí la respuesta en dos partes:
Para responder por qué la función DF nativa (función Spark-SQL nativa) es más rápida:
Básicamente, por qué la función nativa de Spark es SIEMPRE más rápida que Spark UDF, independientemente de que su UDF esté implementada en Python o Scala.
En primer lugar, debemos comprender qué es el tungsteno , que se introdujo por primera vez en Spark 1.4 .
Es un backend y en qué se centra:
- Gestión de memoria fuera del montón mediante representación de datos binarios en memoria, también conocido como formato de fila de tungsteno, y gestión de la memoria explícitamente.
- Cache Locality, que trata sobre cálculos con reconocimiento de caché con diseño con reconocimiento de caché para altas tasas de aciertos de caché,
- Generación de código de etapa completa (también conocido como CodeGen).
Uno de los mayores asesinos del rendimiento de Spark es GC. El GC pausaría todos los subprocesos en JVM hasta que finalizara el GC. Esta es exactamente la razón por la que se introduce la gestión de memoria fuera del montón.
Al ejecutar funciones nativas de Spark-SQL, los datos permanecerán en el backend de tungsteno. Sin embargo, en el escenario Spark UDF, los datos se trasladarán de tungsteno a JVM (escenario Scala) o JVM y Python Process (Python) para realizar el proceso real, y luego volverán a tungsteno. Como resultado de eso:
- Inevitablemente, habría un gasto general/penalización en:
- Deserialice la entrada de tungsteno.
- Serializar la salida nuevamente en tungsteno.
- Incluso usando Scala, el ciudadano de primera clase en Spark, aumentará la huella de memoria dentro de JVM, lo que probablemente implique más GC dentro de JVM. Este problema es exactamente lo que intenta solucionar la función de tungsteno "Administración de memoria fuera del montón" .
Para responder si Python sería necesariamente más lento que Scala:
Desde el 30 de octubre de 2017, Spark acaba de presentar udfs vectorizados para pyspark.
https://databricks.com/blog/2017/10/30/introduciendo-vectorizado-udfs-for-pyspark.html
La razón por la que Python UDF es lento es probablemente que PySpark UDF no está implementado de la manera más optimizada:
Según el párrafo del enlace.
Spark agregó una API de Python en la versión 0.7, con soporte para funciones definidas por el usuario. Estas funciones definidas por el usuario operan una fila a la vez y, por lo tanto, sufren una alta sobrecarga de serialización e invocación.
Sin embargo, las udfs recientemente vectorizadas parecen estar mejorando mucho el rendimiento:
que van desde 3x hasta más de 100x.
Utilice las funciones basadas en columnas estándar de nivel superior con operadores de conjunto de datos siempre que sea posible antes de volver a utilizar sus propias funciones UDF personalizadas, ya que las UDF son un BlackBox para Spark y, por lo tanto, ni siquiera intenta optimizarlas.
Lo que realmente sucede detrás de las pantallas es que Catalyst no puede procesar ni optimizar las UDF en absoluto y las trata como BlackBox, lo que resulta en la pérdida de muchas optimizaciones como Predicate pushdown, Constant Folding y muchas otras.