Tarea no serializable: java.io.NotSerializableException cuando se llama a una función fuera del cierre solo en clases, no en objetos
Obteniendo un comportamiento extraño al llamar a una función fuera de un cierre:
- cuando la función está en un objeto, todo funciona
- cuando la función está en una clase obtiene:
Tarea no serializable: java.io.NotSerializableException: prueba
El problema es que necesito mi código en una clase y no en un objeto. ¿Alguna idea de por qué sucede esto? ¿Está serializado un objeto de Scala (¿predeterminado?)?
Este es un ejemplo de código funcional:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
Este es el ejemplo que no funciona:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
Los RDD amplían la interfaz serializable , por lo que esto no es lo que causa que su tarea falle. Ahora bien, esto no significa que puedas serializar un RDD
con Spark y evitarNotSerializableException
Spark es un motor informático distribuido y su principal abstracción es un conjunto de datos distribuido resiliente ( RDD ), que puede verse como una colección distribuida. Básicamente, los elementos de RDD se dividen entre los nodos del clúster, pero Spark abstrae esto del usuario, permitiéndole interactuar con el RDD (colección) como si fuera local.
No quiero entrar en demasiados detalles, pero cuando ejecuta diferentes transformaciones en un RDD ( map
, y otros), su código de transformación (cierre) es flatMap
:filter
- serializado en el nodo del controlador,
- enviado a los nodos apropiados en el clúster,
- deserializado,
- y finalmente ejecutado en los nodos
Por supuesto, puede ejecutar esto localmente (como en su ejemplo), pero todas esas fases (aparte del envío a través de la red) aún ocurren. [Esto le permite detectar cualquier error incluso antes de implementarlo en producción]
Lo que sucede en su segundo caso es que está llamando a un método, definido en la clase testing
desde dentro de la función de mapa. Spark ve eso y dado que los métodos no se pueden serializar por sí solos, Spark intenta serializar toda la testing
clase, de modo que el código siga funcionando cuando se ejecute en otra JVM. Tienes dos posibilidades:
O hace que las pruebas de clase sean serializables, de modo que Spark pueda serializar toda la clase:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
o creas someFunc
una función en lugar de un método (las funciones son objetos en Scala), para que Spark pueda serializarlo:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
Un problema similar, pero no el mismo, con la serialización de clases puede ser de su interés y puede leerlo en esta presentación de Spark Summit 2013 .
Como nota al margen, puedes reescribir rddList.map(someFunc(_))
en rddList.map(someFunc)
, son exactamente iguales. Por lo general, se prefiere el segundo porque es menos detallado y más limpio de leer.
EDITAR (15 de marzo de 2015): SPARK-5307 introdujo SerializationDebugger y Spark 1.3.0 es la primera versión que lo usa. Agrega una ruta de serialización a NotSerializableException . Cuando se encuentra una excepción NotSerializable, el depurador visita el gráfico del objeto para encontrar la ruta hacia el objeto que no se puede serializar y construye información para ayudar al usuario a encontrar el objeto.
En el caso de OP, esto es lo que se imprime en la salida estándar:
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)
La respuesta de Grega es excelente al explicar por qué el código original no funciona y dos formas de solucionar el problema. Sin embargo, esta solución no es muy flexible; considere el caso en el que su cierre incluye una llamada a un método en una Serializable
clase que no es sobre la que no tiene control. No puede agregar la Serializable
etiqueta a esta clase ni cambiar la implementación subyacente para convertir el método en una función.
Nilesh presenta una excelente solución para esto, pero la solución puede hacerse más concisa y general:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
Este serializador de funciones se puede utilizar para ajustar automáticamente cierres y llamadas a métodos:
rdd map genMapper(someFunc)
Esta técnica también tiene la ventaja de no requerir dependencias adicionales de Shark para poder acceder KryoSerializationWrapper
, ya que el núcleo Spark ya incorpora Chill de Twitter.