I have following code:
//////////////////////
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
MyCustomClassDeserializer.class.getName());
FlinkKafkaConsumer<MyCustomClass> kafkaConsumer = new FlinkKafkaConsumer(
"test-kafka=topic",
new SimpleStringSchema(),
properties);
final StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyCustomClass> kafkaInputStream =
streamEnv.addSource(kafkaConsumer);
DataStream<String> stringStream = kafkaInputStream
.map(new MapFunction<MyCustomClass,String>() {
@Override
public String map(MyCustomClass message) {
logger.info("--- Received message : " +
message.toString());
return message.toString();
}
});
streamEnv.execute("Published messages");
///////
MyCustomClassDeserializer is implemented as:
public MyCustomClass deserialize(String s, byte[] bytes) {
return (MyCustomClass) JsonUtil.convertBytesToObject(bytes,
MyCustomClass.class);
}
When I run this program locally, I get error:
Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Input mismatch: Basic type expected.
Why I get this error?