Hey, This is a great solution for now, thanks. In the end I decided to use the Table API and the RAW format as I needed access to the kafka event timestamp.
Thanks a lot. Best Regards Kamil On Mon, 22 Nov 2021 at 02:31, Dian Fu <dian0511...@gmail.com> wrote: > Hi Kamil, > > Actually FlinkKafkaConsumer expects a DeserializationSchema instead of > JsonRowDeserialization and so I guess you could try SimpleStringSchema. > > Regards, > Dian > > On Sat, Nov 20, 2021 at 5:55 AM Kamil ty <kamilt...@gmail.com> wrote: > >> Hello all, >> >> I'm working on a pyflink job that's supposed to consume json messages >> from Kafka and save them to a partitioned avro file sink. >> I'm having difficulties finding a solution on how to process the >> messages, because there is only one kafka topic for multiple >> message schemas. As pyflinks FlinkKafkaConsumer expects a >> JsonRowDeserialization schema, I assume that all of the messages need a >> constant defined schema. I expect the same for the Kafka Table API. >> >> The messages follow a general debezium message schema: >> Example data taken from flink docs: >> >> { >> "schema": {...}, >> "payload": { >> "before": { >> "id": 111, >> "name": "scooter", >> "description": "Big 2-wheel scooter", >> "weight": 5.18 >> }, >> "after": { >> "id": 111, >> "name": "scooter", >> "description": "Big 2-wheel scooter", >> "weight": 5.15 >> }, >> "source": {...}, >> "op": "u", >> "ts_ms": 1589362330904, >> "transaction": null >> }} >> >> The messages are coming to a single Kafka topic, where the 'schema', >> 'after', 'before' fields can be different for each message. The kafka >> message key also contains the 'schema' field from the above example. My >> question is if there is a way to process such messages coming from a single >> Kafka topic with pyflink without writing a custom DeserializationSchema. >> Any help would be appreciated. >> >> Kind Regards >> Kamil >> >