Can I ask you to help me? I trying to implement a CustomDeserializer My kafka messages are composed by KeyedMessages where key and messages are strings. I created a new class named CustomObject to manage the message string because it's more complex then a simple string.
public class CustomDeserializer implements KeyedDeserializationSchema<Tuple2<String,CustomObject>>{ @Override public boolean isEndOfStream(Tuple2<String, CustomJSONObject> nextElement) { return false; } @Override public TypeInformation<Tuple2<String, CustomJSONObject>> getProducedType() { return null; } @Override public Tuple2<String, CustomJSONObject> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { String key = new String(messageKey); String msg = new String(message); CustomObject customObj = new CustomObject(msg); Tuple2<String,CustomObject> tuple = new Tuple2<String,CustomObject>(key, customObj); return tuple; } } Questions: - I don't understand what is getProducedType method and its usefulness. - Which methods have I to implement in my CustomObject class? My main: DataStream<Tuple2<String,CustomJSONObject>> stream = env.addSource(new FlinkKafkaConsumer010<>("topicTest", new CustomDeserializer(), properties)).rebalance(); stream.print(); If I execute it I get a nullPointerException so I imagine miss something in CustomObject class: I have implemented just a toString() method. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Can-t-get-keyed-messages-from-Kafka-tp13687p13702.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.