To make it clear I will give you an example:

On the source topic we have:
offset: 0
key: null
value:
{
   "request":{

   },
   "response":{

   },
   "data":[
      {
         "deviceId":"23dc78ffa6c1ad3c038b",
         "event":{
            "lvl":2,
            "someValue":{
               "horizontal":9.448819160461426,
               "vertical":1.4566929340362549
            },
            "timestamp":"2024-04-08T02:06:31.000Z"
         }
      },
      {
         "deviceId":"23dc78ffa",
         "event":{
            "lvl":3,
            "someValue":{
               "horizontal":11.149606704711914,
               "vertical":7.503936767578125
            },
            "consequence":2,
            "timestamp":"2024-04-08T02:06:49.000Z"
         }
      },
      {
         "some wrong data":{
            "xx":3,
            "xxx":{
               "xx":11.149606704711914,
               "yyy":7.503936767578125
            }
         }
      }
   ],
   "metadata":{

   }
}

On the sink topic we would like to have:
offset: 0
key: "23dc78ffa6c1ad3c038b"
value:
{
   "deviceId":"23dc78ffa6c1ad3c038b",
   "event":{
      "lvl":2,
      "someValue":{
         "horizontal":9.448819160461426,
         "vertical":1.4566929340362549
      },
      "timestamp":"2024-04-08T02:06:31.000Z"
   }
}

offset: 1
key: " 23dc78ffa"
value:
{
   "deviceId":"23dc78ffa",
   "event":{
      "lvl":3,
      "someValue":{
         "horizontal":11.149606704711914,
         "vertical":7.503936767578125
      },
      "consequence":2,
      "timestamp":"2024-04-08T02:06:49.000Z"
   }
}

And perfect soultion should use only two topics source and sink (no
additional repartition and changelog), and as I mentioned before we don't
like to introduce manual validation in the flatMap (like if null
statements) method,
we would like to use automatic json schema validation feature for that.

wt., 25 lut 2025 o 17:02 Paweł Szymczyk <pawel.szymczy...@gmail.com>
napisał(a):

> Ideal solution has two topics, can we somehow do flatMap and change a key
> without changing creating the internal repartition topic? This will allow
> us to skip using the internal repartition topic as source for k table.
>
>
> Dnia 25 lutego 2025 15:41:19 CET, Bruno Cadonna <cado...@apache.org>
> napisał/a:
>
>> Hi Pawel,
>>
>> I am not completely sure I understand the issue, because I am not a JSON 
>> expert.
>>
>> Is my understanding correct that the serde that you use to read from the 
>> input topic and from the repartition topic also do the validation of the 
>> JSON?
>>
>> Regarding point 2 and 3:
>> I agree that the dependency on the name of the repartition is not a good 
>> idea. The naming of the repartition topic is rather an implementation detail 
>> that should not be leaked. You could improve on it by writing the array 
>> elements to a regular output topic that you name as you want. Then, you can 
>> read the elements again from the output topic and write them into the table.
>>
>> Regarding point 1:
>> Could you do the validation when you write to the elements to the output 
>> topic?
>>
>> Best,
>> Bruno
>>
>> On 25.02.25 14:20, Paweł Szymczyk wrote:
>>
>>> Dear Kafka users,
>>>
>>> The last few days I spent working with Kafka Streams on some tasks which
>>> looked very easy at first glance but finally I struggled with the Streams
>>> Builder API and did something which I am not proud of. Please help me, I am
>>> open to any suggestions.
>>> On the input topic we have a message where inside field data we have an
>>> array of particular devices received from 3party company, for example:
>>> {
>>>      "request": {},
>>>      "response": {},
>>>      "data": [{
>>>              "deviceId": "23dc78ffa6c1ad3c038b",
>>>              "event": {
>>>                  "lvl": 2,
>>>                  "someValue": {
>>>                      "horizontal": 9.448819160461426,
>>>                      "vertical": 1.4566929340362549
>>>                  },
>>>                  "timestamp": "2024-04-08T02:06:31.000Z"
>>>              }
>>>          },
>>>          {
>>>              "deviceId": "23dc78ffa",
>>>              "event": {
>>>                  "lvl": 3,
>>>                  "someValue": {
>>>                      "horizontal": 11.149606704711914,
>>>                      "vertical": 7.503936767578125
>>>                  },
>>>                  "consequence": 2,
>>>                  "timestamp": "2024-04-08T02:06:49.000Z"
>>>              }
>>>          }
>>>      ],
>>>      "metadata": {}
>>> }
>>>
>>> Single message can contain tens or even occasionally hundreds of elements
>>> inside data array. This part is given to us by the external company so it
>>> is hard to do anything with that.
>>>
>>> I need to implement a two-stage validation:
>>>
>>>     1. Validate the root structure (request, response, data, and metadata
>>>     fields)
>>>     2. Validate each element in the data array individually
>>>
>>> When an individual element from the array fails validation:
>>>
>>>     - Send only that element to a Dead Letter Topic
>>>     - Continue processing other valid elements
>>>
>>> When validation passes:
>>>
>>>     - Send each valid element as a separate Kafka record with a new key (
>>>     deviceId) to an output topic
>>>
>>> Now we have two separated, totally not connected json schemas, one for root
>>> element second for single item from an array and the code looks like that:
>>>
>>> Serde<NonValidatedDevices> nonValidatedDevicesSerde = ...;
>>> Serde<ValidatedDevice> validatedDeviceSerde = ...;
>>>
>>> streamsBuilder.stream(properties.getEquipment().getRawTopicDevices(),
>>> Consumed.with(Serdes.String(), equipmentSerde))
>>>                .flatMap((nullKey, devices) -> devices.getResponse()
>>>                                                            .getData()
>>>                                                            .getData()
>>>                                                            .stream()
>>>
>>> .map(simpleEquipment -> new
>>> KeyValue<>(simpleEquipment.getEquipmentNumber(), simpleEquipment))
>>>                                                            .toList())
>>>                .repartition(Repartitioned.<String,
>>> NonValidatedDevices>as("non-validated-repartition").withKeySerde(Serdes.String()).withValueSerde(nonValidatedDevicesSerde));
>>>
>>> KTable<String, ValidatedDevice> device = streamsBuilder.table(
>>>          String.format("%s-%s-repartition",
>>> properties.getEquipment().getApplicationId(), "non-validated-repartition"),
>>>          Consumed.with(Serdes.String(), validatedDeviceSerde));
>>>
>>> I would like to higlight two things about that code. When we have a one
>>> schema then single not valid element in data array fails whole event
>>> validation which is not acceptable. Second thing is that we don't like to
>>> introduce manual validation in the flatMap method, we would like to use
>>> automatic json schema for that.
>>>
>>> What we would like to improve:
>>> 1. We're repartitioning all messages (including invalid ones), which is
>>> inefficient.
>>> 2. We have a fragile connection between the repartition method and table
>>> method:
>>> Repartitioned.<String, NonValidatedDevices>as("non-validated-repartition")
>>> String.format("%s-%s-repartition",
>>> properties.getEquipment().getApplicationId(), "non-validated-repartition")
>>> 3. Kafka Streams internals are unnecessarily exposed in the topic naming
>>> pattern
>>>
>>>
>> Paweł Szymczyk
>


-- 
Pozdrawiam
Paweł Szymczyk

Reply via email to