Thanks. It worked by introducing a custom DeserializationSchema. On Mon, May 4, 2020 at 3:04 PM Robert Metzger <rmetz...@apache.org> wrote:
> Hi, > Can you provide the full stack trace of your exception? > Most likely, the error is caused by this setting: > > properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > MyCustomClassDeserializer.class.getName()); > > You need to use Flink's DeserializationSchema. > > On Mon, May 4, 2020 at 10:26 AM Manish G <manish.c.ghildi...@gmail.com> > wrote: > >> I have following code: >> >> ////////////////////// >> Properties properties = new Properties(); >> properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> MyCustomClassDeserializer.class.getName()); >> >> FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer( >> "test-kafka=topic", >> new SimpleStringSchema(), >> properties); >> >> final StreamExecutionEnvironment streamEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStream<MyCustomClass> kafkaInputStream = >> streamEnv.addSource(kafkaConsumer); >> >> DataStream<String> stringStream = kafkaInputStream >> .map(new MapFunction<MyCustomClass,String>() { >> @Override >> public String map(MyCustomClass message) { >> logger.info("--- Received message : " + >> message.toString()); >> return message.toString(); >> } >> }); >> >> streamEnv.execute("Published messages"); >> >> /////// >> MyCustomClassDeserializer is implemented as: >> >> public MyCustomClass deserialize(String s, byte[] bytes) { >> return (MyCustomClass) JsonUtil.convertBytesToObject(bytes, >> MyCustomClass.class); >> } >> >> When I run this program locally, I get error: >> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: >> Input mismatch: Basic type expected. >> >> Why I get this error? >> >