Your function is somehow capturing the actual Avro schema object,
which won't seiralize. Try rewriting it to ensure that it isn't used
in the function.
On Tue, Feb 2, 2021 at 2:32 PM Artemis User <arte...@dtechspace.com
<mailto:arte...@dtechspace.com>> wrote:
We tried to standardize the SQL data source management using the
Avro schema, but encountered some serialization exceptions when
trying to use the data. The interesting part is that we didn't
have any problems in reading the Avro schema JSON file and
converting the Avro schema into a SQL StructType, then use it to
create a data frame in subsequent data source load operation. The
problem occurred when later using the data frame with some lambda
functions.
I am a little confuse in the sense that why the lamba function
still complains about Avro Schema Record not serializable even
after the the data frame is already created? Because after the
Avro schema is converted to StructType, which is used by the load
function of DataFrameReader, there shouldn't be any reference to
the Avro schema at all. Any hints and suggestions are highly
appreciated.
-- ND
org.apache.spark.SparkException: Task not serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2362) at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at
org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885) at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:382)
at
org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
at
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at
org.apache.spark.sql.Dataset.collect(Dataset.scala:2940) at
determineAnomalies(<console>:138) ... 60 elided Caused by:
java.io.NotSerializableException:
org.apache.avro.Schema$RecordSchema Serialization stack: - object
not serializable (class: org.apache.avro.Schema$RecordSchema,
value:
{"type":"record","name":"Swat_physical_Feb_2021","namespace":"Swat_physical_Feb_2021","fields":[{"name":"Timestamp","type":"string"},{"name":"FIT101","type":"double"},{"name":"LIT101","type":"double"},{"name":"MV101","type":"int"},{"name":"P101","type":"int"},{"name":"P102","type":"int"},{"name":"AIT201","type":"double"},{"name":"AIT202","type":"double"},{"name":"AIT203","type":"double"},{"name":"FIT201","type":"double"},{"name":"MV201","type":"int"},{"name":"P201","type":"int"},{"name":"P202","type":"int"},{"name":"P203","type":"int"},{"name":"P204","type":"int"},{"name":"P205","type":"int"},{"name":"P206","type":"int"},{"name":"DPIT301","type":"double"},{"name":"FIT301","type":"double"},{"name":"LIT301","type":"double"},{"name":"MV301","type":"int"},{"name":"MV302","type":"int"},{"name":"MV303","type":"int"},{"name":"MV304","type":"int"},{"name":"P301","type":"int"},{"name":"P302","type":"int"},{"name":"AIT401","type":"double"},{"name":"AIT402","type":"double"},{"name":"FIT401","type":"double"},{"name":"LIT401","type":"double"},{"name":"P401","type":"int"},{"name":"P402","type":"int"},{"name":"P403","type":"int"},{"name":"P404","type":"int"},{"name":"UV401","type":"int"},{"name":"AIT501","type":"double"},{"name":"AIT502","type":"double"},{"name":"AIT503","type":"double"},{"name":"AIT504","type":"double"},{"name":"FIT501","type":"double"},{"name":"FIT502","type":"double"},{"name":"FIT503","type":"double"},{"name":"FIT504","type":"double"},{"name":"P501","type":"int"},{"name":"P502","type":"int"},{"name":"PIT501","type":"double"},{"name":"PIT502","type":"double"},{"name":"PIT503","type":"double"},{"name":"FIT601","type":"double"},{"name":"P601","type":"int"},{"name":"P602","type":"int"},{"name":"P603","type":"int"},{"name":"Normal_Attack","type":"string"}]})
- field (class: $iw, name: schemaObj, type: class
org.apache.avro.Schema) - object (class $iw, $iw@5d09dadc) - field
(class: $iw, name: $iw, type: class $iw) - object (class $iw,
$iw@102d60fd) - field (class: $iw, name: $iw, type: class $iw) -
object (class $iw, $iw@cec4fce) - field (class: $iw, name: $iw,
type: class $iw) - object (class $iw, $iw@493fbce0) - field
(class: $iw, name: $iw, type: class $iw) - object (class $iw,
$iw@18a15a09) - field (class: $iw, name: $iw, type: class $iw) -
object (class $iw, $iw@163ad83a) - field (class: $iw, name: $iw,
type: class $iw) - object (class $iw, $iw@7b8c10cc) - field
(class: $iw, name: $iw, type: class $iw) - object (class $iw,
$iw@4f51c2ed) - field (class: $iw, name: $iw, type: class $iw) -
object (class $iw, $iw@71c5d6e2) - field (class: $iw, name: $iw,
type: class $iw) - object (class $iw, $iw@411c804f) - field
(class: $iw, name: $iw, type: class $iw) - object (class $iw,
$iw@23bbcdc4) - field (class: $iw, name: $iw, type: class $iw) -
object (class $iw, $iw@33733309) - field (class: $iw, name: $iw,
type: class $iw) - object (class $iw, $iw@6332bc47) - field
(class: $iw, name: $iw, type: class $iw) - object (class $iw,
$iw@60867362) - field (class: $iw, name: $iw, type: class $iw) -
object (class $iw, $iw@37b75cb5) - field (class: $iw, name: $iw,
type: class $iw) - object (class $iw, $iw@421a5931) - field
(class: $iw, name: $iw, type: class $iw) - object (class $iw,
$iw@72cda75a) - field (class: $iw, name: $iw, type: class $iw) -
object (class $iw, $iw@4538e79a) - field (class: $iw, name: $iw,
type: class $iw) - object (class $iw, $iw@12be42d3) - field
(class: $iw, name: $iw, type: class $iw) - object (class $iw,
$iw@7aabcd55) - field (class: $iw, name: $iw, type: class $iw) -
object (class $iw, $iw@387078b) - field (class: $iw, name: $iw,
type: class $iw) - object (class $iw, $iw@58e180ea) - field
(class: $line153036384055.$read, name: $iw, type: class $iw) -
object (class $line153036384055.$read,
$line153036384055.$read@448f7b5b) - field (class: $iw, name:
$line153036384055$read, type: class $line153036384055.$read) -
object (class $iw, $iw@2ef8d075) - field (class: $iw, name:
$outer, type: class $iw) - object (class $iw, $iw@1a3f2f26) -
element of array (index: 0) - array (class [Ljava.lang.Object;,
size 2) - field (class: java.lang.invoke.SerializedLambda, name:
capturedArgs, type: class [Ljava.lang.Object;) - object (class
java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class $iw,
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeStatic
$anonfun$determineAnomalies$1$adapted:(L$iw;Lorg/apache/spark/ml/clustering/KMeansModel;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;,
instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;,
numCaptured=2]) - writeReplace data (class:
java.lang.invoke.SerializedLambda) - object (class
$Lambda$4615/190536174, $Lambda$4615/190536174@130972) - element
of array (index: 1) - array (class [Ljava.lang.Object;, size 2) -
element of array (index: 1) - array (class [Ljava.lang.Object;,
size 3) - field (class: java.lang.invoke.SerializedLambda, name:
capturedArgs, type: class [Ljava.lang.Object;) - object (class
java.lang.invoke.SerializedLambda,
SerializedLambda[capturingClass=class
org.apache.spark.sql.execution.WholeStageCodegenExec,
functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;,
implementation=invokeStatic
org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
numCaptured=3]) - writeReplace data (class:
java.lang.invoke.SerializedLambda) - object (class
org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129,
org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129@79194969)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
... 87 more