Ideal solution has two topics, can we somehow do flatMap and change a key 
without changing creating the internal repartition topic? This will allow us to 
skip using the internal repartition topic as source for k table.

Dnia 25 lutego 2025 15:41:19 CET, Bruno Cadonna <cado...@apache.org> napisał/a:
>Hi Pawel,
>
>I am not completely sure I understand the issue, because I am not a JSON 
>expert.
>
>Is my understanding correct that the serde that you use to read from the input 
>topic and from the repartition topic also do the validation of the JSON?
>
>Regarding point 2 and 3:
>I agree that the dependency on the name of the repartition is not a good idea. 
>The naming of the repartition topic is rather an implementation detail that 
>should not be leaked. You could improve on it by writing the array elements to 
>a regular output topic that you name as you want. Then, you can read the 
>elements again from the output topic and write them into the table.
>
>Regarding point 1:
>Could you do the validation when you write to the elements to the output topic?
>
>Best,
>Bruno
>
>On 25.02.25 14:20, Paweł Szymczyk wrote:
>> Dear Kafka users,
>> 
>> The last few days I spent working with Kafka Streams on some tasks which
>> looked very easy at first glance but finally I struggled with the Streams
>> Builder API and did something which I am not proud of. Please help me, I am
>> open to any suggestions.
>> On the input topic we have a message where inside field data we have an
>> array of particular devices received from 3party company, for example:
>> {
>>      "request": {},
>>      "response": {},
>>      "data": [{
>>              "deviceId": "23dc78ffa6c1ad3c038b",
>>              "event": {
>>                  "lvl": 2,
>>                  "someValue": {
>>                      "horizontal": 9.448819160461426,
>>                      "vertical": 1.4566929340362549
>>                  },
>>                  "timestamp": "2024-04-08T02:06:31.000Z"
>>              }
>>          },
>>          {
>>              "deviceId": "23dc78ffa",
>>              "event": {
>>                  "lvl": 3,
>>                  "someValue": {
>>                      "horizontal": 11.149606704711914,
>>                      "vertical": 7.503936767578125
>>                  },
>>                  "consequence": 2,
>>                  "timestamp": "2024-04-08T02:06:49.000Z"
>>              }
>>          }
>>      ],
>>      "metadata": {}
>> }
>> 
>> Single message can contain tens or even occasionally hundreds of elements
>> inside data array. This part is given to us by the external company so it
>> is hard to do anything with that.
>> 
>> I need to implement a two-stage validation:
>> 
>>     1. Validate the root structure (request, response, data, and metadata
>>     fields)
>>     2. Validate each element in the data array individually
>> 
>> When an individual element from the array fails validation:
>> 
>>     - Send only that element to a Dead Letter Topic
>>     - Continue processing other valid elements
>> 
>> When validation passes:
>> 
>>     - Send each valid element as a separate Kafka record with a new key (
>>     deviceId) to an output topic
>> 
>> Now we have two separated, totally not connected json schemas, one for root
>> element second for single item from an array and the code looks like that:
>> 
>> Serde<NonValidatedDevices> nonValidatedDevicesSerde = ...;
>> Serde<ValidatedDevice> validatedDeviceSerde = ...;
>> 
>> streamsBuilder.stream(properties.getEquipment().getRawTopicDevices(),
>> Consumed.with(Serdes.String(), equipmentSerde))
>>                .flatMap((nullKey, devices) -> devices.getResponse()
>>                                                            .getData()
>>                                                            .getData()
>>                                                            .stream()
>> 
>> .map(simpleEquipment -> new
>> KeyValue<>(simpleEquipment.getEquipmentNumber(), simpleEquipment))
>>                                                            .toList())
>>                .repartition(Repartitioned.<String,
>> NonValidatedDevices>as("non-validated-repartition").withKeySerde(Serdes.String()).withValueSerde(nonValidatedDevicesSerde));
>> 
>> KTable<String, ValidatedDevice> device = streamsBuilder.table(
>>          String.format("%s-%s-repartition",
>> properties.getEquipment().getApplicationId(), "non-validated-repartition"),
>>          Consumed.with(Serdes.String(), validatedDeviceSerde));
>> 
>> I would like to higlight two things about that code. When we have a one
>> schema then single not valid element in data array fails whole event
>> validation which is not acceptable. Second thing is that we don't like to
>> introduce manual validation in the flatMap method, we would like to use
>> automatic json schema for that.
>> 
>> What we would like to improve:
>> 1. We're repartitioning all messages (including invalid ones), which is
>> inefficient.
>> 2. We have a fragile connection between the repartition method and table
>> method:
>> Repartitioned.<String, NonValidatedDevices>as("non-validated-repartition")
>> String.format("%s-%s-repartition",
>> properties.getEquipment().getApplicationId(), "non-validated-repartition")
>> 3. Kafka Streams internals are unnecessarily exposed in the topic naming
>> pattern
>> 
>

Paweł Szymczyk 

Reply via email to