Hello all,

I'm working on a pyflink job that's supposed to consume json messages from
Kafka and save them to a partitioned avro file sink.
I'm having difficulties finding a solution on how to process the
messages, because there is only one kafka topic for multiple
message schemas. As pyflinks FlinkKafkaConsumer expects a
JsonRowDeserialization schema, I assume that all of the messages need a
constant defined schema. I expect the same for the Kafka Table API.

The messages follow a general debezium message schema:
Example data taken from flink docs:

{
  "schema": {...},
  "payload": {
    "before": {
      "id": 111,
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": 5.18
    },
    "after": {
      "id": 111,
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": 5.15
    },
    "source": {...},
    "op": "u",
    "ts_ms": 1589362330904,
    "transaction": null
  }}

The messages are coming to a single Kafka topic, where the 'schema',
'after', 'before' fields can be different for each message. The kafka
message key also contains the 'schema' field from the above example. My
question is if there is a way to process such messages coming from a single
Kafka topic with pyflink without writing a custom DeserializationSchema.
Any help would be appreciated.

Kind Regards
Kamil

Reply via email to