Hi Averell,

it looks as if the org.apache.avro.Schema$Field contains a field which is
an unmodifiable collection. The Kryo serializer will try to deserialize
this field by creating an unmodifiable collection and then trying to add
the elements into it. This will fail.

I would recommend using the AvroSerializer for serializing GenericRecords.
You have to add org.apache.flink:flink-avro as a dependency to your job and
then tell the system that you would like to use
the GenericRecordAvroTypeInfo via

DataStream<GenericRecord> sourceStream =
    env.addSource(new AvroGenericSource())
        .returns(new GenericRecordAvroTypeInfo(schema));

You can find more information about it here [1].

[1]
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro

Cheers,
Till

On Wed, Oct 21, 2020 at 1:48 PM Averell <lvhu...@gmail.com> wrote:

> Hi,
>
> I'm trying to convert a stream of JSON string to a stream of Avro
> GenericRecords, and write this to parquet files, but I get the exception.
> This exception came at the line /out.collect(genericRecord)/. If there's no
> sink then there's no error.
> /KryoException: java.lang.UnsupportedOperationException/
>
> My code is as following:
> /        val parquetSink: StreamingFileSink[GenericRecord] =
> StreamingFileSink
>               .forBulkFormat(new Path(path),
>                   ParquetAvroWriters.forGenericRecord(new
> Schema.Parser().parse(schemaString)))
>               .build()
>
>
>         val parquetStream = inputStream.process(new ProcessFunction[String,
> GenericRecord] {
>             @transient
>             private var schema: Schema = _
>             @transient
>             private var reader: GenericDatumReader[GenericRecord] = _
>
>             override def processElement(value: String,
>                                         ctx: ProcessFunction[String,
> GenericRecord]#Context,
>                                         out: Collector[GenericRecord]):
> Unit
> = {
>                 if (reader == null) {
>                     schema = new Schema.Parser().parse(schemaString)
>                     reader = new GenericDatumReader[GenericRecord](schema)
>                 }
>                 try {
>                     val genericRecord = reader.read(null,
> DecoderFactory.get.jsonDecoder(schema, value))
>                     out.collect(genericRecord)
>                 } catch {
>                     case e: Throwable =>
>                         LOG.warn(s"Error decoding JSON string: $e\nRaw
> string: `${value.value}`")
>                     throw e
>                 }
>             }
>         })
>         parquetStream.addSink(parquetSink)
> /
>
> The schema is a simple one with all fields are string.
> I tried with both Flink 1.10.0 and 1.11.0, and currently stuck at this.
> Could you please help?
>
> Thanks and regards,
> Averell
>
>
> ============
> /com.esotericsoftware.kryo.KryoException:
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
>         at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>         at
>
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>         at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>         at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>         at
>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>         at
>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>         at
> mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:53)
>         at
> mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:44)
>         at
>
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>         at
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>         at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>         at
> org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>         at
> org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>         at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>         at
>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>         at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException: null
>         at
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>         at
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
>         at
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>         at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>         ... 33 common frames omitted/
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to