Hi All,
Basically I try to define a simple UDF and use it in the query, but it gives
me "Task not serializable"
public void test() {
RiskGroupModelDefinition model =
registeredRiskGroupMap.get(this.modelId);
RiskGroupModelDefinition edm = this.createEdm();
JavaSparkContext ctx = this.createSparkContext();
SQLContext sql = new SQLContext(ctx);
sql.udf().register("year", new UDF1<Date, Integer>() {
@Override
public Integer call(Date d) throws Exception {
return d.getYear();
}
}, new org.apache.spark.sql.types.IntegerType());
/** Retrieve all tables for EDM */
DataFrame property =
sql.parquetFile(edm.toS3nPath("property")).filter("ISVALID = 1");
property.registerTempTable("p");
DataFrame yb_lookup =
sql.parquetFile(model.toS3nPath("yb_lookup")).as("yb");
yb_lookup.registerTempTable("yb");
sql.sql("select * from p left join yb on
year(p.YEARBUILT)=yb.yb_class_vdm").count();
ctx.stop();
}
If I remove the UDF, just use p.YEARBUILT=yb.yb_class_vdm, the sql runs
without any problem. But after I add the UDF to the query (just as above
code), the exception as below:
Exception in thread "main"
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,
tree:
Aggregate false, [], [Coalesce(SUM(PartialCount#43L),0) AS count#41L]
Exchange SinglePartition
Aggregate true, [], [COUNT(1) AS PartialCount#43L]
Project []
HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter,
None
Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)
Project [YEARBUILT#14]
Filter ISVALID#18
PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573
Exchange (HashPartitioning [yb_class_vdm#40L], 200)
PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at
newParquet.scala:573
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
at
org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:84)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:887)
at org.apache.spark.sql.DataFrame.count(DataFrame.scala:899)
at
com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.test(Vulna
bilityEncodeExecutor.java:137)
at
com.validusresearch.middleware.executor.VulnabilityEncodeExecutor.main(Vulna
bilityEncodeExecutor.java:488)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:
Exchange SinglePartition
Aggregate true, [], [COUNT(1) AS PartialCount#43L]
Project []
HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter,
None
Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)
Project [YEARBUILT#14]
Filter ISVALID#18
PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573
Exchange (HashPartitioning [yb_class_vdm#40L], 200)
PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at
newParquet.scala:573
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:48)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.
scala:126)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.
scala:125)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
... 6 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:
Aggregate true, [], [COUNT(1) AS PartialCount#43L]
Project []
HashOuterJoin [scalaUDF(YEARBUILT#14)], [yb_class_vdm#40L], LeftOuter,
None
Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)
Project [YEARBUILT#14]
Filter ISVALID#18
PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573
Exchange (HashPartitioning [yb_class_vdm#40L], 200)
PhysicalRDD [yb_class_vdm#40L], MapPartitionsRDD[3] at map at
newParquet.scala:573
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
at
org.apache.spark.sql.execution.Aggregate.execute(Aggregate.scala:124)
at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc
ala:101)
at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc
ala:49)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
... 10 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:
Exchange (HashPartitioning [scalaUDF(YEARBUILT#14)], 200)
Project [YEARBUILT#14]
Filter ISVALID#18
PhysicalRDD [YEARBUILT#14,ISVALID#18], MapPartitionsRDD[1] at map at
newParquet.scala:573
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
at org.apache.spark.sql.execution.Exchange.execute(Exchange.scala:48)
at
org.apache.spark.sql.execution.joins.HashOuterJoin.execute(HashOuterJoin.sca
la:188)
at
org.apache.spark.sql.execution.Project.execute(basicOperators.scala:40)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.
scala:126)
at
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1.apply(Aggregate.
scala:125)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
... 14 more
Caused by: org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal
a:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc
ala:65)
at
org.apache.spark.sql.execution.Exchange$$anonfun$execute$1.apply(Exchange.sc
ala:49)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:46)
... 20 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.
getObjFieldValues$extension(SerializationDebugger.scala:240)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
tSerializable(SerializationDebugger.scala:150)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
t(SerializationDebugger.scala:99)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
tSerializable(SerializationDebugger.scala:158)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
t(SerializationDebugger.scala:99)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
tSerializable(SerializationDebugger.scala:158)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
t(SerializationDebugger.scala:99)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
tSerializable(SerializationDebugger.scala:158)
at
org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visi
t(SerializationDebugger.scala:99)
at
org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugge
r.scala:58)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(Serializ
ationDebugger.scala:39)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializ
er.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.
scala:80)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal
a:164)
... 26 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(Unknown
Source)
at java.io.ObjectStreamClass.getObjFieldValues(Unknown Source)
... 44 more