I have following code: ////////////////////// Properties properties = new Properties(); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyCustomClassDeserializer.class.getName());
FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer( "test-kafka=topic", new SimpleStringSchema(), properties); final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<MyCustomClass> kafkaInputStream = streamEnv.addSource(kafkaConsumer); DataStream<String> stringStream = kafkaInputStream .map(new MapFunction<MyCustomClass,String>() { @Override public String map(MyCustomClass message) { logger.info("--- Received message : " + message.toString()); return message.toString(); } }); streamEnv.execute("Published messages"); /////// MyCustomClassDeserializer is implemented as: public MyCustomClass deserialize(String s, byte[] bytes) { return (MyCustomClass) JsonUtil.convertBytesToObject(bytes, MyCustomClass.class); } When I run this program locally, I get error: Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Basic type expected. Why I get this error?