Hey,
This is a great solution for now, thanks. In the end I decided to use the
Table API and the RAW format as I needed access to the kafka event
timestamp.

Thanks a lot.

Best Regards
Kamil

On Mon, 22 Nov 2021 at 02:31, Dian Fu <dian0511...@gmail.com> wrote:

> Hi Kamil,
>
> Actually FlinkKafkaConsumer expects a DeserializationSchema instead of
> JsonRowDeserialization and so I guess you could try SimpleStringSchema.
>
> Regards,
> Dian
>
> On Sat, Nov 20, 2021 at 5:55 AM Kamil ty <kamilt...@gmail.com> wrote:
>
>> Hello all,
>>
>> I'm working on a pyflink job that's supposed to consume json messages
>> from Kafka and save them to a partitioned avro file sink.
>> I'm having difficulties finding a solution on how to process the
>> messages, because there is only one kafka topic for multiple
>> message schemas. As pyflinks FlinkKafkaConsumer expects a
>> JsonRowDeserialization schema, I assume that all of the messages need a
>> constant defined schema. I expect the same for the Kafka Table API.
>>
>> The messages follow a general debezium message schema:
>> Example data taken from flink docs:
>>
>> {
>>   "schema": {...},
>>   "payload": {
>>     "before": {
>>       "id": 111,
>>       "name": "scooter",
>>       "description": "Big 2-wheel scooter",
>>       "weight": 5.18
>>     },
>>     "after": {
>>       "id": 111,
>>       "name": "scooter",
>>       "description": "Big 2-wheel scooter",
>>       "weight": 5.15
>>     },
>>     "source": {...},
>>     "op": "u",
>>     "ts_ms": 1589362330904,
>>     "transaction": null
>>   }}
>>
>> The messages are coming to a single Kafka topic, where the 'schema',
>> 'after', 'before' fields can be different for each message. The kafka
>> message key also contains the 'schema' field from the above example. My
>> question is if there is a way to process such messages coming from a single
>> Kafka topic with pyflink without writing a custom DeserializationSchema.
>> Any help would be appreciated.
>>
>> Kind Regards
>> Kamil
>>
>

Reply via email to