Hi, I'm trying to convert a stream of JSON string to a stream of Avro GenericRecords, and write this to parquet files, but I get the exception. This exception came at the line /out.collect(genericRecord)/. If there's no sink then there's no error. /KryoException: java.lang.UnsupportedOperationException/
My code is as following: / val parquetSink: StreamingFileSink[GenericRecord] = StreamingFileSink .forBulkFormat(new Path(path), ParquetAvroWriters.forGenericRecord(new Schema.Parser().parse(schemaString))) .build() val parquetStream = inputStream.process(new ProcessFunction[String, GenericRecord] { @transient private var schema: Schema = _ @transient private var reader: GenericDatumReader[GenericRecord] = _ override def processElement(value: String, ctx: ProcessFunction[String, GenericRecord]#Context, out: Collector[GenericRecord]): Unit = { if (reader == null) { schema = new Schema.Parser().parse(schemaString) reader = new GenericDatumReader[GenericRecord](schema) } try { val genericRecord = reader.read(null, DecoderFactory.get.jsonDecoder(schema, value)) out.collect(genericRecord) } catch { case e: Throwable => LOG.warn(s"Error decoding JSON string: $e\nRaw string: `${value.value}`") throw e } } }) parquetStream.addSink(parquetSink) / The schema is a simple one with all fields are string. I tried with both Flink 1.10.0 and 1.11.0, and currently stuck at this. Could you please help? Thanks and regards, Averell ============ /com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: reserved (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:53) at mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:44) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.UnsupportedOperationException: null at java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 33 common frames omitted/ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/