Hi, This simplified code fails with the bellow stacktrace. In case I remove the timestamp logicalType to a regular long in the avro schema, it works fine. Also, it is independant of the sink (also did NPE with a kafka sink). Is it needed to implement a custom kryo serializer for built-in avro logicalTypes ? Or am i missing something ?
GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index; long numberOfRecords = 1000; DataGeneratorSource<String> source = new DataGeneratorSource<>(generatorFunction, numberOfRecords, Types.STRING); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final Schema abSchema = new Schema.Parser() .parse( "{\"type\": \"record\", " + "\"name\": \"User\", " + "\"fields\": [\n" + " {\"name\": \"a\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\" },\n" + " {\"name\": \"b\", \"type\": [\"null\",\"string\"] }\n" + " ]\n" + " }"); final FileSink<GenericRecord> sink = FileSink.forBulkFormat( new Path("/tmp/foo/"), AvroParquetWriters.forGenericRecord(abSchema)) .build(); DataStreamSource<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Generator Source"); // this for being able to map the stream Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection"); env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class); stream.map(record -> { GenericRecord newRecord = new GenericData.Record(abSchema); newRecord.put("a", 1L); newRecord.put("b", "bar"); return newRecord; }).sinkTo(sink); env.execute(); Stacktrace: Caused by: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: props (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:322) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) at org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase.pollNext(IteratorSourceReaderBase.java:111) at org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader.pollNext(RateLimitedSourceReader.java:69) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.NullPointerException at org.apache.avro.JsonProperties$2.putIfAbsent(JsonProperties.java:159) at org.apache.avro.JsonProperties$2.put(JsonProperties.java:166) at org.apache.avro.JsonProperties$2.put(JsonProperties.java:151) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 36 more