Hello Hans,

As far as I know the JSONKeyValueDeserializationSchema returns a Jackson
ObjectNode. Below I have included an example based on Flink stable
documentation.

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(new JSONKeyValueDeserializationSchema(false))
.build();

DataStream<ObjectNode> ds = env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");
                // Below we access the JSON field stored in the ObjectNode.
DataStream<String> processedDs = ds.map(record ->
record.get("value").get("my-field").asText());

It is also possible to implement your own deserialization schema that for
eg. could turn each record into a POJO. You can do this by
implementing the KafkaDeserializationSchema
(Flink : 1.14-SNAPSHOT API) (apache.org)
<https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.html>.
If you are only interested in the value of the Kafka record, you can also
extend the AbstractDeserializationSchema (Flink : 1.14-SNAPSHOT API)
(apache.org)
<https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.html>
and
use .setValueOnlyDeserializer(new CustomDeserializer()). There is also a
different API that you could use for this which is specified here:
KafkaSourceBuilder
(Flink : 1.14-SNAPSHOT API) (apache.org)
<https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html>.
Although the customDeserializer will be the same for older Flink versions,
the Kafka Source has appeared recently, to learn about the previous kafka
source (FlinkKafkaConsumer) see: Kafka | Apache Flink
<https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-sourcefunction>
.

Best Regards
Kamil

On Fri, 14 Jan 2022 at 18:37, HG <hanspeter.sl...@gmail.com> wrote:

> Hi,
>
> Before starting programming myself I'd like to know whether there are good
> examples with deserialization of JSON that I can borrow.
> The structure of the JSON is nested with multiple levels.
>
> Any references?
>
> 'better well stolen than badly invented myself' we'd say in Dutch😁
>
> Regards Hans
>

Reply via email to