[
https://issues.apache.org/jira/browse/SPARK-33598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598165#comment-17598165
]
Santokh Singh edited comment on SPARK-33598 at 8/31/22 4:38 AM:
----------------------------------------------------------------
*Facing same exception, Spark Version 3.2.2*
*Using avro mvn plugin to generate java class from below avro schema,*
{color:#ff0000}*Exception in thread "main"
java.lang.UnsupportedOperationException: Cannot have circular references in
bean class, but got the circular reference of class class
org.apache.avro.Schema*{color}{*}{{*}}
*AVRO SCHEMA*
[
{
"type": "record",
"namespace":"kafka.avro.schema.nested",
"name": "Address",
"fields": [
{ "name": "streetaddress", "type": "string"}
,
{"name": "city", "type": "string" }
]
},
{
"type": "record",
"name": "person",
"namespace":"kafka.avro.schema.nested",
"fields": [
{ "name": "firstname", "type": "string"}
,
{ "name": "lastname", "type": "string" }
,
{ "name": "address", "type": ["null","Address"] }
]
}
]
*-------CODE --------*
import kafka.avro.schema.nested.person;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import za.co.absa.abris.avro.functions.*;
import za.co.absa.abris.config.AbrisConfig;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;
public class KafkaAvroStreamingAbris {
public static void main(String[] args) throws IOException,
StreamingQueryException, TimeoutException {
SparkSession spark = SparkSession.builder()
.appName("AvroApp")
.master("local")
.getOrCreate();
Dataset df = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "person")
.option("startingOffsets", "earliest")
.load();
Dataset df2 = df
.select(za.co.absa.abris.avro.functions.from_avro(
org.apache.spark.sql.functions.col("value"),
za.co.absa.abris.config.AbrisConfig
.fromConfluentAvro().downloadReaderSchemaByLatestVersion()
.andTopicNameStrategy("person",false)
.usingSchemaRegistry("http://localhost:8089")).as("data"));
Dataset df3 = df2.map((MapFunction<Row,person>) row->
{ String rr = row.toString(); return null; }
, Encoders.bean(person.class));
StreamingQuery streamingQuery = df2
.writeStream()
.queryName("Kafka-Write")
.format("console")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime(Long.parseLong("2000")))
.start();
streamingQuery.awaitTermination();
}
}
was (Author: santokhsdg):
*Facing same exception, Spark Version 3.2.2*
*Using avro mvn plugin to generate java class from below avro schema,*
{color:#ff0000}*Exception in thread "main"
java.lang.UnsupportedOperationException: Cannot have circular references in
bean class, but got the circular reference of class class
org.apache.avro.Schema*{color}{*}{{*}}
*AVRO SCHEMA*
[
{
"type": "record",
"namespace":"kafka.avro.schema.nested",
"name": "Address",
"fields": [
{ "name": "streetaddress", "type": "string"}
,
{"name": "city", "type": "string" }
]
},
{
"type": "record",
"name": "person",
"namespace":"kafka.avro.schema.nested",
"fields": [
{ "name": "firstname", "type": "string"}
,
{ "name": "lastname", "type": "string" }
,
{ "name": "address", "type": ["null","Address"] }
]
}
]
*-------CODE --------*
import kafka.avro.schema.nested.person;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import za.co.absa.abris.avro.functions.*;
import za.co.absa.abris.config.AbrisConfig;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;
public class KafkaAvroStreamingAbris {
public static void main(String[] args) throws IOException,
StreamingQueryException, TimeoutException {
SparkSession spark = SparkSession.builder()
.appName("AvroApp")
.master("local")
.getOrCreate();
Dataset df = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "person")
.option("startingOffsets", "earliest")
.load();
Dataset df2 = df
.select(za.co.absa.abris.avro.functions.from_avro(
org.apache.spark.sql.functions.col("value"),
za.co.absa.abris.config.AbrisConfig
.fromConfluentAvro().downloadReaderSchemaByLatestVersion()
.andTopicNameStrategy("person",false)
.usingSchemaRegistry("http://localhost:8089")).as("data"));
Dataset df3 = df2.map((MapFunction<Row,person>) row->
{ String rr = row.toString(); return null; }
, Encoders.bean(PersonBean.class));
StreamingQuery streamingQuery = df2
.writeStream()
.queryName("Kafka-Write")
.format("console")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime(Long.parseLong("2000")))
.start();
streamingQuery.awaitTermination();
}
}
> Support Java Class with circular references
> -------------------------------------------
>
> Key: SPARK-33598
> URL: https://issues.apache.org/jira/browse/SPARK-33598
> Project: Spark
> Issue Type: Improvement
> Components: Java API
> Affects Versions: 3.1.2
> Reporter: jacklzg
> Priority: Minor
>
> If the target Java data class has a circular reference, Spark will fail fast
> from creating the Dataset or running Encoders.
>
> For example, with protobuf class, there is a reference with Descriptor, there
> is no way to build a dataset from the protobuf class.
> From this line
> {color:#7a869a}Encoders.bean(ProtoBuffOuterClass.ProtoBuff.class);{color}
>
> It will throw out immediately
>
> {quote}Exception in thread "main" java.lang.UnsupportedOperationException:
> Cannot have circular references in bean class, but got the circular reference
> of class class com.google.protobuf.Descriptors$Descriptor
> {quote}
>
> Can we add a parameter, for example,
>
> {code:java}
> Encoders.bean(Class<T> clas, List<Fields> fieldsToIgnore);{code}
> ````
> or
>
> {code:java}
> Encoders.bean(Class<T> clas, boolean skipCircularRefField);{code}
>
> which subsequently, instead of throwing an exception @
> [https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L556],
> it instead skip the field.
>
> {code:java}
> if (seenTypeSet.contains(t)) {
> if(skipCircularRefField)
> println("field skipped") //just skip this field
> else throw new UnsupportedOperationException( s"cannot have circular
> references in class, but got the circular reference of class $t")
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]