Llamar a la función Java/Scala desde una tarea

Resuelto zero323 asked hace 9 años • 1 respuestas

Fondo

Mi pregunta original aquí era ¿ Por qué el uso DecisionTreeModel.predictde la función de mapa interno genera una excepción? y está relacionado con ¿Cómo generar tuplas de (etiqueta original, etiqueta predicha) en Spark con MLlib?

Cuando utilizamos Scala API, una forma recomendada de obtener predicciones de RDD[LabeledPoint]uso DecisionTreeModeles simplemente mapear RDD:

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

Lamentablemente, un enfoque similar en PySpark no funciona tan bien:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

Excepción: parece que está intentando hacer referencia a SparkContext desde una variable, acción o transformación de transmisión. SparkContext solo se puede usar en el controlador, no en el código que se ejecuta en los trabajadores. Para obtener más información, consulte SPARK-5063 .

En lugar de eso, la documentación oficial recomienda algo como esto:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

Entonces, ¿qué está pasando aquí? Aquí no hay ninguna variable de transmisión y Scala API la define predictde la siguiente manera:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing a single data point
 * @return Double prediction from the trained model
 */
def predict(features: Vector): Double = {
  topNode.predict(features)
}

/**
 * Predict values for the given data set using the model trained.
 *
 * @param features RDD representing data points to be predicted
 * @return RDD of predictions for each of the given data points
 */
def predict(features: RDD[Vector]): RDD[Double] = {
  features.map(x => predict(x))
}

por lo que, al menos a primera vista, llamar a una acción o transformación no es un problema, ya que la predicción parece ser una operación local.

Explicación

Después de investigar un poco, descubrí que la fuente del problema es un JavaModelWrapper.callmétodo invocado desde DecisionTreeModel.predict . Accede a SparkContext lo que se requiere para llamar a la función Java:

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

Pregunta

En caso de que DecisionTreeModel.predictexista una solución alternativa recomendada y todo el código requerido ya forme parte de la API de Scala, pero ¿existe alguna forma elegante de manejar un problema como este en general?

Las únicas soluciones que se me ocurren ahora son bastante pesadas:

  • empujando todo a JVM, ya sea extendiendo las clases de Spark a través de conversiones implícitas o agregando algún tipo de contenedores
  • usando la puerta de enlace Py4j directamente
zero323 avatar Jul 29 '15 01:07 zero323
Aceptado

La comunicación mediante la puerta de enlace Py4J predeterminada simplemente no es posible. Para entender por qué tenemos que echar un vistazo al siguiente diagrama del documento PySpark Internals [1]:

ingrese la descripción de la imagen aquí

Dado que la puerta de enlace Py4J se ejecuta en el controlador, no es accesible para los intérpretes de Python que se comunican con los trabajadores de JVM a través de sockets (consulte, por ejemplo, PythonRDD/ rdd.py).

En teoría, sería posible crear una puerta de enlace Py4J independiente para cada trabajador, pero en la práctica es poco probable que resulte útil. Ignorando cuestiones como la confiabilidad, Py4J simplemente no está diseñado para realizar tareas intensivas en datos.

¿Hay alguna solución?

  1. Uso de la API Spark SQL Data Sources para empaquetar código JVM.

    Ventajas : compatible, de alto nivel, no requiere acceso a la API interna de PySpark

    Desventajas : Relativamente detallado y no muy bien documentado, limitado principalmente a los datos de entrada.

  2. Operando en DataFrames usando Scala UDF.

    Ventajas : Fácil de implementar (consulte Spark: ¿Cómo asignar Python con funciones definidas por el usuario de Scala o Java? ), sin conversión de datos entre Python y Scala si los datos ya están almacenados en un DataFrame, acceso mínimo a Py4J

    Desventajas : Requiere acceso a la puerta de enlace Py4J y a métodos internos, limitado a Spark SQL, difícil de depurar, no compatible

  3. Crear una interfaz Scala de alto nivel de forma similar a como se hace en MLlib.

    Ventajas : Flexible, capacidad de ejecutar código complejo arbitrario. Se puede realizar directamente en RDD (ver, por ejemplo , envoltorios de modelos MLlib ) o con DataFrames(ver Cómo usar una clase Scala dentro de Pyspark ). La última solución parece ser mucho más amigable ya que todos los detalles del servicio ya son manejados por la API existente.

    Desventajas : nivel bajo, conversión de datos requerida, al igual que las UDF, requiere acceso a Py4J y API interna, no compatible

    Se pueden encontrar algunos ejemplos básicos en Transformación de PySpark RDD con Scala.

  4. Usar una herramienta externa de administración de flujo de trabajo para cambiar entre trabajos de Python y Scala/Java y pasar datos a un DFS.

    Ventajas : Fácil de implementar, cambios mínimos en el código mismo

    Desventajas : Costo de lectura/escritura de datos (¿ Alluxio ?)

  5. Usar compartido SQLContext(ver, por ejemplo, Apache Zeppelin o Livy ) para pasar datos entre idiomas invitados mediante tablas temporales registradas.

    Ventajas : Muy adecuado para análisis interactivos.

    Desventajas : No tanto para trabajos por lotes (Zeppelin) o puede requerir orquestación adicional (Livy)


  1. Josué Rosen. (4 de agosto de 2014) Componentes internos de PySpark . Obtenido de https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals
zero323 avatar Dec 22 '2015 09:12 zero323