Hi Robert, Thanks for the info! On Thu, Aug 27, 2020 at 8:01 PM Robert Metzger <rmetz...@apache.org> wrote:
> Hi, > > Check out the KafkaDeserializationSchema ( > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html#the-deserializationschema) > which allows you to deserialize the key and value bytes coming from Kafka. > > Best, > Robert > > > On Thu, Aug 27, 2020 at 1:56 PM Manas Kale <manaskal...@gmail.com> wrote: > >> Hi, >> I have a kafka topic on which the key is serialized in a custom format >> and the value is serialized as JSON. How do I create a FlinkKafakConsumer >> that has different deserialization schemas for the key and value? Here's >> what I tried: >> >> FlinkKafkaConsumer<Tuple2<MyClass, ObjectNode>> advancedFeatureData = new >> FlinkKafkaConsumer<>(ADVANCED_FEATURES_TOPIC, new >> TypeInformationKeyValueSerializationSchema<MyClass, ObjectNode>( >> TypeInformation.of(new TypeHint<MyClass>() {}), >> TypeInformation.of(new TypeHint<ObjectNode>() {}), >> env.getConfig() >> ), properties); >> >> However, I get the error: >> com.esotericsoftware.kryo.KryoException: Encountered unregistered class >> ID: 121 >> at >> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) >> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) >> at >> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:112) >> at >> org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema.deserialize(TypeInformationKeyValueSerializationSchema.java:43) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >> >> Is there something I am missing with my approach or am I supposed to use >> a completely different class than >> TypeInformationKeyValueSerializationSchema? >> >