Hi,

This simplified code fails with the bellow stacktrace. In case I remove
the timestamp logicalType to a regular long in the avro schema, it
works fine. Also, it is independant of the sink (also did NPE with a
kafka sink).
Is it needed to implement a custom kryo serializer for built-in avro
logicalTypes ? Or am i missing something ?

    GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + 
index;
    long numberOfRecords = 1000;
    DataGeneratorSource<String> source =
        new DataGeneratorSource<>(generatorFunction, numberOfRecords, 
Types.STRING);
    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    final Schema abSchema =
        new Schema.Parser()
            .parse(
                "{\"type\": \"record\", "
                    + "\"name\": \"User\", "
                    + "\"fields\": [\n"
                    + "        {\"name\": \"a\", \"type\": \"long\", 
\"logicalType\": \"timestamp-millis\" },\n"
                    + "        {\"name\": \"b\", \"type\": 
[\"null\",\"string\"] }\n"
                    + "    ]\n"
                    + "    }");
    final FileSink<GenericRecord> sink =
        FileSink.forBulkFormat(
                new Path("/tmp/foo/"),
                AvroParquetWriters.forGenericRecord(abSchema))
            .build();
    DataStreamSource<String> stream =
        env.fromSource(source,
            WatermarkStrategy.noWatermarks(),
            "Generator Source");
    // this for being able to map the stream
    Class<?> unmodColl = 
Class.forName("java.util.Collections$UnmodifiableCollection");
    env.getConfig().addDefaultKryoSerializer(unmodColl, 
UnmodifiableCollectionsSerializer.class);

     stream.map(record -> {
      GenericRecord newRecord = new GenericData.Record(abSchema);
      newRecord.put("a", 1L);
      newRecord.put("b", "bar");
      return newRecord;
    }).sinkTo(sink);
    env.execute();

 

Stacktrace:

Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.NullPointerException
Serialization trace:
props (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:322)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
        at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
        at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
        at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
        at 
org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase.pollNext(IteratorSourceReaderBase.java:111)
        at 
org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader.pollNext(RateLimitedSourceReader.java:69)
        at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
        at org.apache.avro.JsonProperties$2.putIfAbsent(JsonProperties.java:159)
        at org.apache.avro.JsonProperties$2.put(JsonProperties.java:166)
        at org.apache.avro.JsonProperties$2.put(JsonProperties.java:151)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:144)
        at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
        ... 36 more

Reply via email to