Hussein,

To use a JsonRowDeserializationSchema you'll need to use the Table API, and
not DataStream.

You'll want to use a JsonRowSchemaConverter to convert your json schema
into the TypeInformation needed by Flink, which is done for you by
the JsonRowDeserializationSchema builder:

    json_row_schema =
JsonRowDeserializationSchema.builder().json_schema(json_schema_string).build()

Given that schema, you can pass it to constructor for the kafka consumer:

    kafka_consumer = FlinkKafkaConsumer("stream-source", json_row_schema,
kafka_props)

To read from 3 different topics, you can either instantiate three different
sources, or specify that a single source is to be used to read from
multiple topics, which you can do by passing a list of strings as the
topics parameter.

Regards,
David

On Fri, Jan 28, 2022 at 12:07 PM Hussein El Ghoul <huss...@quiqup.com>
wrote:

> Hello,
>
> How to specify the deserialization schema for multiple Kafka topics using
> Flink (python)
>
> I want to read from multiple Kafka topics with JSON schema using
> FlinkKafkaConsumer, and I assume that I need to use
> JsonRowDeserializationSchema to deserialize the data. The schema of the
> topics is very large (around 1500 lines for each topic), so I want to read
> it from a file instead of manually typing the types in the program. How can
> I do that?
>
> 1. How to specify deserialization schema for multiple topics (3 topics)
> 2. How to read the JSON schema from a file?
>
>
> https://stackoverflow.com/q/70892579/13067721?sem=2
>
> Thanks in advance,
> Hussein
> Quiqup - Data Engineer

Reply via email to