Your function is somehow capturing the actual Avro schema object, which
won't seiralize. Try rewriting it to ensure that it isn't used in the
function.

On Tue, Feb 2, 2021 at 2:32 PM Artemis User <arte...@dtechspace.com> wrote:

> We tried to standardize the SQL data source management using the Avro
> schema, but encountered some serialization exceptions when trying to use
> the data.  The interesting part is that we didn't have any problems in
> reading the Avro schema JSON file and converting the Avro schema into a SQL
> StructType, then use it to create a data frame in subsequent data source
> load operation.  The problem occurred when later using the data frame with
> some lambda functions.
>
> I am a little confuse in the sense that why the lamba function still
> complains about Avro Schema Record not serializable even after the the data
> frame is already created?  Because after the Avro schema is converted to
> StructType, which is used by the load function of DataFrameReader, there
> shouldn't be any reference to the Avro schema at all.  Any hints and
> suggestions are highly appreciated.
>
> -- ND
>
> org.apache.spark.SparkException: Task not serializable at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at
> org.apache.spark.SparkContext.clean(SparkContext.scala:2362) at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at
> org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885) at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
> at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:382)
> at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627) at
> org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940) at
> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at
> org.apache.spark.sql.Dataset.collect(Dataset.scala:2940) at
> determineAnomalies(<console>:138) ... 60 elided Caused by:
> java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
> Serialization stack: - object not serializable (class:
> org.apache.avro.Schema$RecordSchema, value:
> {"type":"record","name":"Swat_physical_Feb_2021","namespace":"Swat_physical_Feb_2021","fields":[{"name":"Timestamp","type":"string"},{"name":"FIT101","type":"double"},{"name":"LIT101","type":"double"},{"name":"MV101","type":"int"},{"name":"P101","type":"int"},{"name":"P102","type":"int"},{"name":"AIT201","type":"double"},{"name":"AIT202","type":"double"},{"name":"AIT203","type":"double"},{"name":"FIT201","type":"double"},{"name":"MV201","type":"int"},{"name":"P201","type":"int"},{"name":"P202","type":"int"},{"name":"P203","type":"int"},{"name":"P204","type":"int"},{"name":"P205","type":"int"},{"name":"P206","type":"int"},{"name":"DPIT301","type":"double"},{"name":"FIT301","type":"double"},{"name":"LIT301","type":"double"},{"name":"MV301","type":"int"},{"name":"MV302","type":"int"},{"name":"MV303","type":"int"},{"name":"MV304","type":"int"},{"name":"P301","type":"int"},{"name":"P302","type":"int"},{"name":"AIT401","type":"double"},{"name":"AIT402","type":"double"},{"name":"FIT401","type":"double"},{"name":"LIT401","type":"double"},{"name":"P401","type":"int"},{"name":"P402","type":"int"},{"name":"P403","type":"int"},{"name":"P404","type":"int"},{"name":"UV401","type":"int"},{"name":"AIT501","type":"double"},{"name":"AIT502","type":"double"},{"name":"AIT503","type":"double"},{"name":"AIT504","type":"double"},{"name":"FIT501","type":"double"},{"name":"FIT502","type":"double"},{"name":"FIT503","type":"double"},{"name":"FIT504","type":"double"},{"name":"P501","type":"int"},{"name":"P502","type":"int"},{"name":"PIT501","type":"double"},{"name":"PIT502","type":"double"},{"name":"PIT503","type":"double"},{"name":"FIT601","type":"double"},{"name":"P601","type":"int"},{"name":"P602","type":"int"},{"name":"P603","type":"int"},{"name":"Normal_Attack","type":"string"}]})
> - field (class: $iw, name: schemaObj, type: class org.apache.avro.Schema) -
> object (class $iw, $iw@5d09dadc) - field (class: $iw, name: $iw, type:
> class $iw) - object (class $iw, $iw@102d60fd) - field (class: $iw, name:
> $iw, type: class $iw) - object (class $iw, $iw@cec4fce) - field (class:
> $iw, name: $iw, type: class $iw) - object (class $iw, $iw@493fbce0) -
> field (class: $iw, name: $iw, type: class $iw) - object (class $iw,
> $iw@18a15a09) - field (class: $iw, name: $iw, type: class $iw) - object
> (class $iw, $iw@163ad83a) - field (class: $iw, name: $iw, type: class
> $iw) - object (class $iw, $iw@7b8c10cc) - field (class: $iw, name: $iw,
> type: class $iw) - object (class $iw, $iw@4f51c2ed) - field (class: $iw,
> name: $iw, type: class $iw) - object (class $iw, $iw@71c5d6e2) - field
> (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@411c804f)
> - field (class: $iw, name: $iw, type: class $iw) - object (class $iw,
> $iw@23bbcdc4) - field (class: $iw, name: $iw, type: class $iw) - object
> (class $iw, $iw@33733309) - field (class: $iw, name: $iw, type: class
> $iw) - object (class $iw, $iw@6332bc47) - field (class: $iw, name: $iw,
> type: class $iw) - object (class $iw, $iw@60867362) - field (class: $iw,
> name: $iw, type: class $iw) - object (class $iw, $iw@37b75cb5) - field
> (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@421a5931)
> - field (class: $iw, name: $iw, type: class $iw) - object (class $iw,
> $iw@72cda75a) - field (class: $iw, name: $iw, type: class $iw) - object
> (class $iw, $iw@4538e79a) - field (class: $iw, name: $iw, type: class
> $iw) - object (class $iw, $iw@12be42d3) - field (class: $iw, name: $iw,
> type: class $iw) - object (class $iw, $iw@7aabcd55) - field (class: $iw,
> name: $iw, type: class $iw) - object (class $iw, $iw@387078b) - field
> (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@58e180ea)
> - field (class: $line153036384055.$read, name: $iw, type: class $iw) -
> object (class $line153036384055.$read, $line153036384055.$read@448f7b5b)
> - field (class: $iw, name: $line153036384055$read, type: class
> $line153036384055.$read) - object (class $iw, $iw@2ef8d075) - field
> (class: $iw, name: $outer, type: class $iw) - object (class $iw,
> $iw@1a3f2f26) - element of array (index: 0) - array (class
> [Ljava.lang.Object;, size 2) - field (class:
> java.lang.invoke.SerializedLambda, name: capturedArgs, type: class
> [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda,
> SerializedLambda[capturingClass=class $iw,
> functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
> implementation=invokeStatic
> $anonfun$determineAnomalies$1$adapted:(L$iw;Lorg/apache/spark/ml/clustering/KMeansModel;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;,
> instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;,
> numCaptured=2]) - writeReplace data (class:
> java.lang.invoke.SerializedLambda) - object (class $Lambda$4615/190536174,
> $Lambda$4615/190536174@130972) - element of array (index: 1) - array
> (class [Ljava.lang.Object;, size 2) - element of array (index: 1) - array
> (class [Ljava.lang.Object;, size 3) - field (class:
> java.lang.invoke.SerializedLambda, name: capturedArgs, type: class
> [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda,
> SerializedLambda[capturingClass=class
> org.apache.spark.sql.execution.WholeStageCodegenExec,
> functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;,
> implementation=invokeStatic
> org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
> instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
> numCaptured=3]) - writeReplace data (class:
> java.lang.invoke.SerializedLambda) - object (class
> org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129,
> org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129@79194969)
> at
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
> ... 87 more
>

Reply via email to