Hi,

I'm trying to convert a stream of JSON string to a stream of Avro
GenericRecords, and write this to parquet files, but I get the exception.
This exception came at the line /out.collect(genericRecord)/. If there's no
sink then there's no error.
/KryoException: java.lang.UnsupportedOperationException/

My code is as following:
/        val parquetSink: StreamingFileSink[GenericRecord] =
StreamingFileSink
              .forBulkFormat(new Path(path),
                  ParquetAvroWriters.forGenericRecord(new
Schema.Parser().parse(schemaString)))
              .build()


        val parquetStream = inputStream.process(new ProcessFunction[String,
GenericRecord] {
            @transient
            private var schema: Schema = _
            @transient
            private var reader: GenericDatumReader[GenericRecord] = _

            override def processElement(value: String,
                                        ctx: ProcessFunction[String,
GenericRecord]#Context,
                                        out: Collector[GenericRecord]): Unit
= {
                if (reader == null) {
                    schema = new Schema.Parser().parse(schemaString)
                    reader = new GenericDatumReader[GenericRecord](schema)
                }
                try {
                    val genericRecord = reader.read(null,
DecoderFactory.get.jsonDecoder(schema, value))
                    out.collect(genericRecord)
                } catch {
                    case e: Throwable =>
                        LOG.warn(s"Error decoding JSON string: $e\nRaw
string: `${value.value}`")
                    throw e
                }
            }
        })
        parquetStream.addSink(parquetSink)
/

The schema is a simple one with all fields are string.
I tried with both Flink 1.10.0 and 1.11.0, and currently stuck at this.
Could you please help?

Thanks and regards,
Averell


============
/com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
        at
mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:53)
        at
mypackage.ParquetFromJson$$anon$1.processElement(ParquetFromJson.scala:44)
        at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
        at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: null
        at 
java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
        at
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 33 common frames omitted/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to