Hi, Can you provide the full stack trace of your exception? Most likely, the error is caused by this setting:
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName()); You need to use Flink's DeserializationSchema. On Mon, May 4, 2020 at 10:26 AM Manish G <manish.c.ghildi...@gmail.com> wrote: > I have following code: > > ////////////////////// > Properties properties = new Properties(); > properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > MyCustomClassDeserializer.class.getName()); > > FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer( > "test-kafka=topic", > new SimpleStringSchema(), > properties); > > final StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<MyCustomClass> kafkaInputStream = > streamEnv.addSource(kafkaConsumer); > > DataStream<String> stringStream = kafkaInputStream > .map(new MapFunction<MyCustomClass,String>() { > @Override > public String map(MyCustomClass message) { > logger.info("--- Received message : " + > message.toString()); > return message.toString(); > } > }); > > streamEnv.execute("Published messages"); > > /////// > MyCustomClassDeserializer is implemented as: > > public MyCustomClass deserialize(String s, byte[] bytes) { > return (MyCustomClass) JsonUtil.convertBytesToObject(bytes, > MyCustomClass.class); > } > > When I run this program locally, I get error: > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: > Input mismatch: Basic type expected. > > Why I get this error? >