Hi Aissa,

You can easily do this by using Flink SQL, you can define a kafka table
using Flink DDL:

CREATE TABLE sensor_logs (
    `date` STRING,
    `main` ROW<
        `ph` DOUBLE,
        `whc` DOUBLE,
        `temperature` DOUBLE,
        `humidity` DOUBLE>,
    `id` BIGINT,
    `coord` ROW<
         `lat` DOUBLE,
         `lon` DOUBLE>
) WITH (
    'connector.type' = 'kafka',  -- using kafka connector
    'connector.version' = 'universal',  -- kafka version, universal
supports Kafka 0.11+
    'connector.topic' = 'sensor_logs',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- reading from the
beginning
    'connector.properties.zookeeper.connect' = 'localhost:2181',  --
zookeeper address
    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka
broker address
    'format.type' = 'json'  -- the data format is json
);

And then, you can use `SELECT * FROM logs` query to get the
structrued data.
You can refer the documentation about more details [1]. Please remember to
add the
required depency jars into your cluster[2].

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#kafka-connector
[2]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#dependencies

On Tue, 5 May 2020 at 07:54, Aissa Elaffani <aissaelaff...@gmail.com> wrote:

> Hello,
> Please can you share with me, some demos or examples of deserialization
> with flink.
> I need to consume some kafka message produced by sensors in JSON format.
> here is my JSON message :
> {"date": "2018-05-31 15:10", "main": {"ph": 5.0, "whc": 60.0,
> "temperature": 9.5, "humidity": 96}, "id": 2582, "coord": {"lat": 57.79,
> "lon": -54.58}}
>
>

Reply via email to