to get past this you can move the mapper creation code down into the closure.
its then created on the worker node so it doesnt need to be serialized.
// Parse it into a specific case class. We use flatMap to handle errors
// by returning an empty list (None) if we encounter an issue and a
// list with one element if everything is ok (Some(_)).
val result = input.flatMap(record => {
try {
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
Some(mapper.readValue(record, classOf[Company]))
} catch {
case e: Exception => None
}
})
result.map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)
}
}
BUT for more efficiency look into creating the mapper in a *mapPartitions*
iterator, which means it'll be created on the worker node but only per
partition and not for every row like above.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/SparkException-Task-not-serializable-Jackson-Json-tp21347p21655.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]