Hi,You can use DeserializationSchema with KafkaSource as well. You can pass it to the KafkaSource.builder():
KafkaSource.<...>builder() .setDeserializer(...)You can also take a look at the StateMachineExample[1], where KafkaSource is used.
BTW, have you tried looking at Table API? It would abstract quite a few things for you, e.g. translation of what I presume is a CSV format[2] in your case.
Best, Dawid[1] https://github.com/apache/flink/blob/5846d8d61b4b2aa10f925e9f63885cb7f6686303/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java#L104
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/
On 03/02/2022 16:56, HG wrote:
Hello Most examples available still use the FlinkKafkaConsumer unfortunately. I need to consume events from Kafka. The format is Long,Timestamp,String,String. Do I need to create a custom deserializer? What also confuses me is KafkaSource*<String>* source = KafkaSource.... How does it relate to the deserializer?Is there a kind of <Row> type or is <String> fine even if the message is a composite of Long,String...?Regards Hans
OpenPGP_signature
Description: OpenPGP digital signature