Thanks Sean.  But the real issue is on the surface, my UDF should have no knowledge of the Avro schema at all!  Here are the high-level steps of what happened:

1. read the JSON to create a Avro Schema object  -- OK
2. Convert the Avro schema to a StructType using the Spark's
   SchemaConverter -- OK
3. Create a data frame by using the StructType as the schema when
   loading a csv file -- OK
4. Doing other SQL operations with the data frame -- OK
5. Use the data frame in a UDF -- exception occurred.

We didn't have any exception when manually creating the struct type in step 2.  In other words, would the data frame, when created using an Avro-converted StructType, still keeps a reference to the original Avro schema object?

-- ND

On 2/2/21 3:36 PM, Sean Owen wrote:
Your function is somehow capturing the actual Avro schema object, which won't seiralize. Try rewriting it to ensure that it isn't used in the function.

On Tue, Feb 2, 2021 at 2:32 PM Artemis User <arte...@dtechspace.com <mailto:arte...@dtechspace.com>> wrote:

    We tried to standardize the SQL data source management using the
    Avro schema, but encountered some serialization exceptions when
    trying to use the data.  The interesting part is that we didn't
    have any problems in reading the Avro schema JSON file and
    converting the Avro schema into a SQL StructType, then use it to
    create a data frame in subsequent data source load operation.  The
    problem occurred when later using the data frame with some lambda
    functions.

    I am a little confuse in the sense that why the lamba function
    still complains about Avro Schema Record not serializable even
    after the the data frame is already created?  Because after the
    Avro schema is converted to StructType, which is used by the load
    function of DataFrameReader, there shouldn't be any reference to
    the Avro schema at all.  Any hints and suggestions are highly
    appreciated.

    -- ND

    org.apache.spark.SparkException: Task not serializable at
    
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
    at
    org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
    at
    org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2362) at
    org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886)
    at
    
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at
    
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at
    org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885) at
    
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
    at
    
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at
    
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at
    
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at
    org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at
    org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at
    
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
    at
    org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:382)
    at
    org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
    at
    org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940)
    at
    org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
    at
    
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at
    
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at
    
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at
    org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at
    
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at
    org.apache.spark.sql.Dataset.collect(Dataset.scala:2940) at
    determineAnomalies(<console>:138) ... 60 elided Caused by:
    java.io.NotSerializableException:
    org.apache.avro.Schema$RecordSchema Serialization stack: - object
    not serializable (class: org.apache.avro.Schema$RecordSchema,
    value:
    
{"type":"record","name":"Swat_physical_Feb_2021","namespace":"Swat_physical_Feb_2021","fields":[{"name":"Timestamp","type":"string"},{"name":"FIT101","type":"double"},{"name":"LIT101","type":"double"},{"name":"MV101","type":"int"},{"name":"P101","type":"int"},{"name":"P102","type":"int"},{"name":"AIT201","type":"double"},{"name":"AIT202","type":"double"},{"name":"AIT203","type":"double"},{"name":"FIT201","type":"double"},{"name":"MV201","type":"int"},{"name":"P201","type":"int"},{"name":"P202","type":"int"},{"name":"P203","type":"int"},{"name":"P204","type":"int"},{"name":"P205","type":"int"},{"name":"P206","type":"int"},{"name":"DPIT301","type":"double"},{"name":"FIT301","type":"double"},{"name":"LIT301","type":"double"},{"name":"MV301","type":"int"},{"name":"MV302","type":"int"},{"name":"MV303","type":"int"},{"name":"MV304","type":"int"},{"name":"P301","type":"int"},{"name":"P302","type":"int"},{"name":"AIT401","type":"double"},{"name":"AIT402","type":"double"},{"name":"FIT401","type":"double"},{"name":"LIT401","type":"double"},{"name":"P401","type":"int"},{"name":"P402","type":"int"},{"name":"P403","type":"int"},{"name":"P404","type":"int"},{"name":"UV401","type":"int"},{"name":"AIT501","type":"double"},{"name":"AIT502","type":"double"},{"name":"AIT503","type":"double"},{"name":"AIT504","type":"double"},{"name":"FIT501","type":"double"},{"name":"FIT502","type":"double"},{"name":"FIT503","type":"double"},{"name":"FIT504","type":"double"},{"name":"P501","type":"int"},{"name":"P502","type":"int"},{"name":"PIT501","type":"double"},{"name":"PIT502","type":"double"},{"name":"PIT503","type":"double"},{"name":"FIT601","type":"double"},{"name":"P601","type":"int"},{"name":"P602","type":"int"},{"name":"P603","type":"int"},{"name":"Normal_Attack","type":"string"}]})
    - field (class: $iw, name: schemaObj, type: class
    org.apache.avro.Schema) - object (class $iw, $iw@5d09dadc) - field
    (class: $iw, name: $iw, type: class $iw) - object (class $iw,
    $iw@102d60fd) - field (class: $iw, name: $iw, type: class $iw) -
    object (class $iw, $iw@cec4fce) - field (class: $iw, name: $iw,
    type: class $iw) - object (class $iw, $iw@493fbce0) - field
    (class: $iw, name: $iw, type: class $iw) - object (class $iw,
    $iw@18a15a09) - field (class: $iw, name: $iw, type: class $iw) -
    object (class $iw, $iw@163ad83a) - field (class: $iw, name: $iw,
    type: class $iw) - object (class $iw, $iw@7b8c10cc) - field
    (class: $iw, name: $iw, type: class $iw) - object (class $iw,
    $iw@4f51c2ed) - field (class: $iw, name: $iw, type: class $iw) -
    object (class $iw, $iw@71c5d6e2) - field (class: $iw, name: $iw,
    type: class $iw) - object (class $iw, $iw@411c804f) - field
    (class: $iw, name: $iw, type: class $iw) - object (class $iw,
    $iw@23bbcdc4) - field (class: $iw, name: $iw, type: class $iw) -
    object (class $iw, $iw@33733309) - field (class: $iw, name: $iw,
    type: class $iw) - object (class $iw, $iw@6332bc47) - field
    (class: $iw, name: $iw, type: class $iw) - object (class $iw,
    $iw@60867362) - field (class: $iw, name: $iw, type: class $iw) -
    object (class $iw, $iw@37b75cb5) - field (class: $iw, name: $iw,
    type: class $iw) - object (class $iw, $iw@421a5931) - field
    (class: $iw, name: $iw, type: class $iw) - object (class $iw,
    $iw@72cda75a) - field (class: $iw, name: $iw, type: class $iw) -
    object (class $iw, $iw@4538e79a) - field (class: $iw, name: $iw,
    type: class $iw) - object (class $iw, $iw@12be42d3) - field
    (class: $iw, name: $iw, type: class $iw) - object (class $iw,
    $iw@7aabcd55) - field (class: $iw, name: $iw, type: class $iw) -
    object (class $iw, $iw@387078b) - field (class: $iw, name: $iw,
    type: class $iw) - object (class $iw, $iw@58e180ea) - field
    (class: $line153036384055.$read, name: $iw, type: class $iw) -
    object (class $line153036384055.$read,
    $line153036384055.$read@448f7b5b) - field (class: $iw, name:
    $line153036384055$read, type: class $line153036384055.$read) -
    object (class $iw, $iw@2ef8d075) - field (class: $iw, name:
    $outer, type: class $iw) - object (class $iw, $iw@1a3f2f26) -
    element of array (index: 0) - array (class [Ljava.lang.Object;,
    size 2) - field (class: java.lang.invoke.SerializedLambda, name:
    capturedArgs, type: class [Ljava.lang.Object;) - object (class
    java.lang.invoke.SerializedLambda,
    SerializedLambda[capturingClass=class $iw,
    
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
    implementation=invokeStatic
    
$anonfun$determineAnomalies$1$adapted:(L$iw;Lorg/apache/spark/ml/clustering/KMeansModel;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;,
    instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;,
    numCaptured=2]) - writeReplace data (class:
    java.lang.invoke.SerializedLambda) - object (class
    $Lambda$4615/190536174, $Lambda$4615/190536174@130972) - element
    of array (index: 1) - array (class [Ljava.lang.Object;, size 2) -
    element of array (index: 1) - array (class [Ljava.lang.Object;,
    size 3) - field (class: java.lang.invoke.SerializedLambda, name:
    capturedArgs, type: class [Ljava.lang.Object;) - object (class
    java.lang.invoke.SerializedLambda,
    SerializedLambda[capturingClass=class
    org.apache.spark.sql.execution.WholeStageCodegenExec,
    
functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;,
    implementation=invokeStatic
    
org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
    
instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
    numCaptured=3]) - writeReplace data (class:
    java.lang.invoke.SerializedLambda) - object (class
    
org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129,
    
org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129@79194969)
    at
    
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at
    
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at
    
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at
    
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
    ... 87 more

Reply via email to