Hi Darshan ,
When you get org.apache.spark.SparkException: Task not serializable exception, it means that you are using a reference to an instance of a non-serialize class inside a transformation. Hope following link will help. https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html Regards, Vaquar khan On Fri, Feb 17, 2017 at 9:36 PM, Darshan Pandya <darshanpan...@gmail.com> wrote: > Hello, > > I am getting the famous serialization exception on running some code as > below, > > val correctColNameUDF = udf(getNewColumnName(_: String, false: Boolean): > String); > val charReference: DataFrame = thinLong.select("char_name_id", > "char_name").withColumn("columnNameInDimTable", > correctColNameUDF(col("char_name"))).withColumn("applicable_dimension", > lit(dimension).cast(StringType)).distinct(); > val charReferenceTableName: String = s"""$TargetSFFDBName.fg_char_reference""" > val tableName: String = charReferenceTableName.toString > charReference.saveAsTable(tableName, saveMode) > > I think it has something to do with the UDF, so I am pasting the UDF > function as well > > def getNewColumnName(oldColName: String, appendID: Boolean): String = { > var newColName = oldColName.replaceAll("\\s", "_").replaceAll("%", > "_pct").replaceAllLiterally("#", "No") > return newColName; > } > > > *Exception *seen is > > Caused by: org.apache.spark.SparkException: Task not serializable > at org.apache.spark.util.ClosureCleaner$.ensureSerializable( > ClosureCleaner.scala:304) > at org.apache.spark.util.ClosureCleaner$.org$apache$ > spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2066) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:150) > at org.apache.spark.rdd.RDDOperationScope$.withScope( > RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) > at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$ > doExecute$1.apply(TungstenAggregate.scala:86) > at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$ > doExecute$1.apply(TungstenAggregate.scala:80) > at org.apache.spark.sql.catalyst.errors.package$.attachTree( > package.scala:48) > ... 73 more > Caused by: java.io.NotSerializableException: com.nielsen.datamodel. > converters.cip2sff.ProductDimensionSFFConverterRealApp$ > Serialization stack: > - object not serializable (class: com.nielsen.datamodel. > converters.cip2sff.ProductDimensionSFFConverterRealApp$, value: > com.nielsen.datamodel.converters.cip2sff.ProductDimensionSFFConverterRe > alApp$@247a8411) > - field (class: com.nielsen.datamodel.converters.cip2sff. > CommonTransformationTraits$$anonfun$1, name: $outer, type: interface > com.nielsen.datamodel.converters.cip2sff.CommonTransformationTraits) > - object (class com.nielsen.datamodel.converters.cip2sff. > CommonTransformationTraits$$anonfun$1, <function1>) > - field (class: org.apache.spark.sql.catalyst. > expressions.ScalaUDF$$anonfun$2, name: func$2, type: interface > scala.Function1) > - object (class org.apache.spark.sql.catalyst. > expressions.ScalaUDF$$anonfun$2, <function1>) > - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: > f, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, > UDF(char_name#3)) > - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: > child, type: class org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Alias, > UDF(char_name#3) AS columnNameInDimTable#304) > - element of array (index: 2) > - array (class [Ljava.lang.Object;, size 4) > - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: > class [Ljava.lang.Object;) > - object (class scala.collection.mutable.ArrayBuffer, > ArrayBuffer(char_name_id#2, char_name#3, UDF(char_name#3) AS > columnNameInDimTable#304, PRODUCT AS applicable_dimension#305)) > - field (class: org.apache.spark.sql.execution.Project, name: > projectList, type: interface scala.collection.Seq) > - object (class org.apache.spark.sql.execution.Project, Project > [char_name_id#2,char_name#3,UDF(char_name#3) AS > columnNameInDimTable#304,PRODUCT > AS applicable_dimension#305] > > > > -- > Sincerely, > Darshan > > -- Regards, Vaquar Khan +1 -224-436-0783 IT Architect / Lead Consultant Greater Chicago