Hi,
Can you provide the full stack trace of your exception?
Most likely, the error is caused by this setting:

properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
MyCustomClassDeserializer.class.getName());

You need to use Flink's DeserializationSchema.

On Mon, May 4, 2020 at 10:26 AM Manish G <manish.c.ghildi...@gmail.com>
wrote:

> I have following code:
>
> //////////////////////
> Properties properties = new Properties();
> properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> MyCustomClassDeserializer.class.getName());
>
> FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
>                     "test-kafka=topic",
>                     new SimpleStringSchema(),
>                     properties);
>
> final StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<MyCustomClass> kafkaInputStream =
> streamEnv.addSource(kafkaConsumer);
>
> DataStream<String> stringStream = kafkaInputStream
>                     .map(new MapFunction<MyCustomClass,String>() {
>                         @Override
>                         public String map(MyCustomClass message) {
>                             logger.info("--- Received message : " +
> message.toString());
>                             return message.toString();
>                         }
>                     });
>
> streamEnv.execute("Published messages");
>
> ///////
> MyCustomClassDeserializer is implemented as:
>
> public MyCustomClass deserialize(String s, byte[] bytes) {
>         return (MyCustomClass) JsonUtil.convertBytesToObject(bytes,
> MyCustomClass.class);
>     }
>
> When I run this program locally, I get error:
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
> Input mismatch: Basic type expected.
>
> Why I get this error?
>

Reply via email to