Hi Aissa, You can easily do this by using Flink SQL, you can define a kafka table using Flink DDL:
CREATE TABLE sensor_logs ( `date` STRING, `main` ROW< `ph` DOUBLE, `whc` DOUBLE, `temperature` DOUBLE, `humidity` DOUBLE>, `id` BIGINT, `coord` ROW< `lat` DOUBLE, `lon` DOUBLE> ) WITH ( 'connector.type' = 'kafka', -- using kafka connector 'connector.version' = 'universal', -- kafka version, universal supports Kafka 0.11+ 'connector.topic' = 'sensor_logs', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- reading from the beginning 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper address 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker address 'format.type' = 'json' -- the data format is json ); And then, you can use `SELECT * FROM logs` query to get the structrued data. You can refer the documentation about more details [1]. Please remember to add the required depency jars into your cluster[2]. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#kafka-connector [2]: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#dependencies On Tue, 5 May 2020 at 07:54, Aissa Elaffani <aissaelaff...@gmail.com> wrote: > Hello, > Please can you share with me, some demos or examples of deserialization > with flink. > I need to consume some kafka message produced by sensors in JSON format. > here is my JSON message : > {"date": "2018-05-31 15:10", "main": {"ph": 5.0, "whc": 60.0, > "temperature": 9.5, "humidity": 96}, "id": 2582, "coord": {"lat": 57.79, > "lon": -54.58}} > >