Hi,

I use a FlinkKinesisConsumer in a Flink job to de-serialize kinesis events.

Flink: 1.9.2
Avro: 1.9.2

The serDe class is like:

public class ManagedSchemaKinesisPayloadSerDe<T extends SpecificRecord>
        implements KinesisSerializationSchema<T>,
KinesisDeserializationSchema<T> {

private static final String REGISTRY_ENDPOINT =
"https://schema-registry.my.net";;
private static final long serialVersionUID = -1L;
private final Class<T> tClass;
private String topic;

public ManagedSchemaKinesisPayloadSerDe(final Class<T> tClass) {
    this.tClass = tClass;
    this.topic = null;
    SpecificData.get().addLogicalTypeConversion(new
TimeConversions.TimestampConversion());
}

@Override
public ByteBuffer serialize(T obj) {
    Properties props = new Properties();
    props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
REGISTRY_ENDPOINT);

    // some code to create schemaReg
    final KafkaAvroSerializer serializer = new
KafkaAvroSerializer(schemaReg, new HashMap(props));
    return ByteBuffer.wrap(
            serializer.serialize(topic, obj));
}

@Override
public T deserialize(
        byte[] record,
        String partitionKey,
        String sequenceNumber,
        long eventUtcTimestamp,
        String streamName,
        String shardId) throws IOException {
    Properties props = new Properties();
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
Boolean.toString(true));
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
REGISTRY_ENDPOINT);
    VerifiableProperties vProps = new VerifiableProperties(props);

    // some code to create schemaReg

    final KafkaAvroDecoder decoder = new KafkaAvroDecoder(schemaReg, vProps);
    return  (T) decoder.fromBytes(record);
}

@Override
public TypeInformation<T> getProducedType() {
    return TypeInformation.of(tClass);
}

} // end of class ManagedSchemaKinesisPayloadSerDe


// create consumer, stream environment:

ManagedSchemaKinesisPayloadSerDe<MyPoJoRecord> serDe =
        new ManagedSchemaKinesisPayloadSerDe<>(MyPoJoRecord.class);

final FlinkKinesisConsumer<MyPoJoRecord> consumer = new FlinkKinesisConsumer<>(
        streamName,
        serDe,
        streamConfig);

streamEnv
       .addSource(consumer)
       .print();
streamEnv.execute();


The exception:
java.lang.RuntimeException: Unknown datum type org.joda.time.DateTime:
2020-02-16T19:14:20.983Z
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:766)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:287)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:284)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:748)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
org.joda.time.DateTime: 2020-02-16T19:14:20.983Z
        at 
org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:772)
        at 
org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:302)
        at 
org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:737)
        at 
org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:205)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:123)
        at 
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
        at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:125)
        at 
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:87)
        at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
        at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
        at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
        at 
org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:185)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
        at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:78)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:152)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
        ... 19 more



Observation:

1. The kinesis events are deserialized SUCCESSFULLY. The exception
comes when FlinkKinesisConsumer

tried to serialize the deserialized object post deserialization.
FlinkKinesisConsumer will create its

own DatumWriter by using MyPoJoRecord (a child class of
SpecificRecord). I have no control

on this DatumWriter. For example, I cannot add a logicDataType or
register a custom kyro serializer.
GenericData.get().addLogicalTypeConversion(new
TimeConversions.TimestampConversion()) does not resolve the problem.



2. the joda mentioned in the exception is from "union {null,
timestamp_ms }". This union will be

handled by org.apache.avro.generic.GenericData.resolveUnion(). Because
of 1, resolveUnion()

cannot get any conversions and fail to handle Joda time.

Question:

Is it expected that FlinkKinesisConsumer cannot handle Joda time? Any
solution here?

Appreciate very much!

Reply via email to