Hussein, To use a JsonRowDeserializationSchema you'll need to use the Table API, and not DataStream.
You'll want to use a JsonRowSchemaConverter to convert your json schema into the TypeInformation needed by Flink, which is done for you by the JsonRowDeserializationSchema builder: json_row_schema = JsonRowDeserializationSchema.builder().json_schema(json_schema_string).build() Given that schema, you can pass it to constructor for the kafka consumer: kafka_consumer = FlinkKafkaConsumer("stream-source", json_row_schema, kafka_props) To read from 3 different topics, you can either instantiate three different sources, or specify that a single source is to be used to read from multiple topics, which you can do by passing a list of strings as the topics parameter. Regards, David On Fri, Jan 28, 2022 at 12:07 PM Hussein El Ghoul <huss...@quiqup.com> wrote: > Hello, > > How to specify the deserialization schema for multiple Kafka topics using > Flink (python) > > I want to read from multiple Kafka topics with JSON schema using > FlinkKafkaConsumer, and I assume that I need to use > JsonRowDeserializationSchema to deserialize the data. The schema of the > topics is very large (around 1500 lines for each topic), so I want to read > it from a file instead of manually typing the types in the program. How can > I do that? > > 1. How to specify deserialization schema for multiple topics (3 topics) > 2. How to read the JSON schema from a file? > > > https://stackoverflow.com/q/70892579/13067721?sem=2 > > Thanks in advance, > Hussein > Quiqup - Data Engineer