Hi Pawel,

What is the "automatic json schema validation feature"?
Streams does not have such a thing built-in.

Do you have an example of a third-party software that does that validation?

Regarding decomposing your input records into the smaller array elements, that should work with

builder.stream(...).flatMap(...).to(...)

This should not create any repartition or changelog topics.

Have you tried that?

The question is where to put the validation of the array elements. That depends also from what you mean with "automatic json schema validation feature".

Best,
Bruno




On 25.02.25 17:22, Paweł Szymczyk wrote:
To make it clear I will give you an example:

On the source topic we have:
offset: 0
key: null
value:
{
    "request":{

    },
    "response":{

    },
    "data":[
       {
          "deviceId":"23dc78ffa6c1ad3c038b",
          "event":{
             "lvl":2,
             "someValue":{
                "horizontal":9.448819160461426,
                "vertical":1.4566929340362549
             },
             "timestamp":"2024-04-08T02:06:31.000Z"
          }
       },
       {
          "deviceId":"23dc78ffa",
          "event":{
             "lvl":3,
             "someValue":{
                "horizontal":11.149606704711914,
                "vertical":7.503936767578125
             },
             "consequence":2,
             "timestamp":"2024-04-08T02:06:49.000Z"
          }
       },
       {
          "some wrong data":{
             "xx":3,
             "xxx":{
                "xx":11.149606704711914,
                "yyy":7.503936767578125
             }
          }
       }
    ],
    "metadata":{

    }
}

On the sink topic we would like to have:
offset: 0
key: "23dc78ffa6c1ad3c038b"
value:
{
    "deviceId":"23dc78ffa6c1ad3c038b",
    "event":{
       "lvl":2,
       "someValue":{
          "horizontal":9.448819160461426,
          "vertical":1.4566929340362549
       },
       "timestamp":"2024-04-08T02:06:31.000Z"
    }
}

offset: 1
key: " 23dc78ffa"
value:
{
    "deviceId":"23dc78ffa",
    "event":{
       "lvl":3,
       "someValue":{
          "horizontal":11.149606704711914,
          "vertical":7.503936767578125
       },
       "consequence":2,
       "timestamp":"2024-04-08T02:06:49.000Z"
    }
}

And perfect soultion should use only two topics source and sink (no
additional repartition and changelog), and as I mentioned before we don't
like to introduce manual validation in the flatMap (like if null
statements) method,
we would like to use automatic json schema validation feature for that.

wt., 25 lut 2025 o 17:02 Paweł Szymczyk <pawel.szymczy...@gmail.com>
napisał(a):

Ideal solution has two topics, can we somehow do flatMap and change a key
without changing creating the internal repartition topic? This will allow
us to skip using the internal repartition topic as source for k table.


Dnia 25 lutego 2025 15:41:19 CET, Bruno Cadonna <cado...@apache.org>
napisał/a:

Hi Pawel,

I am not completely sure I understand the issue, because I am not a JSON expert.

Is my understanding correct that the serde that you use to read from the input 
topic and from the repartition topic also do the validation of the JSON?

Regarding point 2 and 3:
I agree that the dependency on the name of the repartition is not a good idea. 
The naming of the repartition topic is rather an implementation detail that 
should not be leaked. You could improve on it by writing the array elements to 
a regular output topic that you name as you want. Then, you can read the 
elements again from the output topic and write them into the table.

Regarding point 1:
Could you do the validation when you write to the elements to the output topic?

Best,
Bruno

On 25.02.25 14:20, Paweł Szymczyk wrote:

Dear Kafka users,

The last few days I spent working with Kafka Streams on some tasks which
looked very easy at first glance but finally I struggled with the Streams
Builder API and did something which I am not proud of. Please help me, I am
open to any suggestions.
On the input topic we have a message where inside field data we have an
array of particular devices received from 3party company, for example:
{
      "request": {},
      "response": {},
      "data": [{
              "deviceId": "23dc78ffa6c1ad3c038b",
              "event": {
                  "lvl": 2,
                  "someValue": {
                      "horizontal": 9.448819160461426,
                      "vertical": 1.4566929340362549
                  },
                  "timestamp": "2024-04-08T02:06:31.000Z"
              }
          },
          {
              "deviceId": "23dc78ffa",
              "event": {
                  "lvl": 3,
                  "someValue": {
                      "horizontal": 11.149606704711914,
                      "vertical": 7.503936767578125
                  },
                  "consequence": 2,
                  "timestamp": "2024-04-08T02:06:49.000Z"
              }
          }
      ],
      "metadata": {}
}

Single message can contain tens or even occasionally hundreds of elements
inside data array. This part is given to us by the external company so it
is hard to do anything with that.

I need to implement a two-stage validation:

     1. Validate the root structure (request, response, data, and metadata
     fields)
     2. Validate each element in the data array individually

When an individual element from the array fails validation:

     - Send only that element to a Dead Letter Topic
     - Continue processing other valid elements

When validation passes:

     - Send each valid element as a separate Kafka record with a new key (
     deviceId) to an output topic

Now we have two separated, totally not connected json schemas, one for root
element second for single item from an array and the code looks like that:

Serde<NonValidatedDevices> nonValidatedDevicesSerde = ...;
Serde<ValidatedDevice> validatedDeviceSerde = ...;

streamsBuilder.stream(properties.getEquipment().getRawTopicDevices(),
Consumed.with(Serdes.String(), equipmentSerde))
                .flatMap((nullKey, devices) -> devices.getResponse()
                                                            .getData()
                                                            .getData()
                                                            .stream()

.map(simpleEquipment -> new
KeyValue<>(simpleEquipment.getEquipmentNumber(), simpleEquipment))
                                                            .toList())
                .repartition(Repartitioned.<String,
NonValidatedDevices>as("non-validated-repartition").withKeySerde(Serdes.String()).withValueSerde(nonValidatedDevicesSerde));

KTable<String, ValidatedDevice> device = streamsBuilder.table(
          String.format("%s-%s-repartition",
properties.getEquipment().getApplicationId(), "non-validated-repartition"),
          Consumed.with(Serdes.String(), validatedDeviceSerde));

I would like to higlight two things about that code. When we have a one
schema then single not valid element in data array fails whole event
validation which is not acceptable. Second thing is that we don't like to
introduce manual validation in the flatMap method, we would like to use
automatic json schema for that.

What we would like to improve:
1. We're repartitioning all messages (including invalid ones), which is
inefficient.
2. We have a fragile connection between the repartition method and table
method:
Repartitioned.<String, NonValidatedDevices>as("non-validated-repartition")
String.format("%s-%s-repartition",
properties.getEquipment().getApplicationId(), "non-validated-repartition")
3. Kafka Streams internals are unnecessarily exposed in the topic naming
pattern


Paweł Szymczyk




Reply via email to