Hi Bruno,

Many thanks for trying to understand the problem, I prepare some repository
which has simple demo https://github.com/pszymczyk/streams-flatmap. You can
run /src/main/javaMain.main and then /src/test/java/SimpleTest, as the
result you will have 6 messages on custom-repartition topic and 3 messages
on validated-devices.

After writing that code I realized that what I actually like to have for
that case is sth like that (not compiling now):
streamsBuilder.stream("raw-devices", Consumed.with(Serdes.String(),
SerDeFactory.nonValidatedDevicesSerde()))
            .flatMap((key1, value1) -> value1.devices
                .stream()
                .map(nonValidatedDevice -> new
KeyValue<>(nonValidatedDevice.deviceId, nonValidatedDevice))
                .toList())
            .to("validated-devices", Produced.with(Serdes.String(),
SerDeFactory.validatedDeviceSerde()));

which means that I like to change the sink topic Serde on the fly without
mapping the objects manually.


śr., 26 lut 2025 o 07:46 Paweł Szymczyk <pawel.szymczy...@gmail.com>
napisał(a):

> I will provide you with the sample source code on GitHub today.
>
>
> Dnia 25 lutego 2025 20:47:36 CET, Bruno Cadonna <cado...@apache.org>
> napisał/a:
>
>> Hi Pawel,
>>
>> What is the "automatic json schema validation feature"?
>> Streams does not have such a thing built-in.
>>
>> Do you have an example of a third-party software that does that validation?
>>
>> Regarding decomposing your input records into the smaller array elements, 
>> that should work with
>>
>> builder.stream(...).flatMap(...).to(...)
>>
>> This should not create any repartition or changelog topics.
>>
>> Have you tried that?
>>
>> The question is where to put the validation of the array elements. That 
>> depends also from what you mean with "automatic json schema validation 
>> feature".
>>
>> Best,
>> Bruno
>>
>>
>>
>>
>> On 25.02.25 17:22, Paweł Szymczyk wrote:
>>
>>> To make it clear I will give you an example:
>>>
>>> On the source topic we have:
>>> offset: 0
>>> key: null
>>> value:
>>> {
>>>     "request":{
>>>
>>>     },
>>>     "response":{
>>>
>>>     },
>>>     "data":[
>>>        {
>>>           "deviceId":"23dc78ffa6c1ad3c038b",
>>>           "event":{
>>>              "lvl":2,
>>>              "someValue":{
>>>                 "horizontal":9.448819160461426,
>>>                 "vertical":1.4566929340362549
>>>              },
>>>              "timestamp":"2024-04-08T02:06:31.000Z"
>>>           }
>>>        },
>>>        {
>>>           "deviceId":"23dc78ffa",
>>>           "event":{
>>>              "lvl":3,
>>>              "someValue":{
>>>                 "horizontal":11.149606704711914,
>>>                 "vertical":7.503936767578125
>>>              },
>>>              "consequence":2,
>>>              "timestamp":"2024-04-08T02:06:49.000Z"
>>>           }
>>>        },
>>>        {
>>>           "some wrong data":{
>>>              "xx":3,
>>>              "xxx":{
>>>                 "xx":11.149606704711914,
>>>                 "yyy":7.503936767578125
>>>              }
>>>           }
>>>        }
>>>     ],
>>>     "metadata":{
>>>
>>>     }
>>> }
>>>
>>> On the sink topic we would like to have:
>>> offset: 0
>>> key: "23dc78ffa6c1ad3c038b"
>>> value:
>>> {
>>>     "deviceId":"23dc78ffa6c1ad3c038b",
>>>     "event":{
>>>        "lvl":2,
>>>        "someValue":{
>>>           "horizontal":9.448819160461426,
>>>           "vertical":1.4566929340362549
>>>        },
>>>        "timestamp":"2024-04-08T02:06:31.000Z"
>>>     }
>>> }
>>>
>>> offset: 1
>>> key: " 23dc78ffa"
>>> value:
>>> {
>>>     "deviceId":"23dc78ffa",
>>>     "event":{
>>>        "lvl":3,
>>>        "someValue":{
>>>           "horizontal":11.149606704711914,
>>>           "vertical":7.503936767578125
>>>        },
>>>        "consequence":2,
>>>        "timestamp":"2024-04-08T02:06:49.000Z"
>>>     }
>>> }
>>>
>>> And perfect soultion should use only two topics source and sink (no
>>> additional repartition and changelog), and as I mentioned before we don't
>>> like to introduce manual validation in the flatMap (like if null
>>> statements) method,
>>> we would like to use automatic json schema validation feature for that.
>>>
>>> wt., 25 lut 2025 o 17:02 Paweł Szymczyk <pawel.szymczy...@gmail.com>
>>> napisał(a):
>>>
>>> Ideal solution has two topics, can we somehow do flatMap and change a key
>>>> without changing creating the internal repartition topic? This will allow
>>>> us to skip using the internal repartition topic as source for k table.
>>>>
>>>>
>>>> Dnia 25 lutego 2025 15:41:19 CET, Bruno Cadonna <cado...@apache.org>
>>>> napisał/a:
>>>>
>>>> Hi Pawel,
>>>>>
>>>>> I am not completely sure I understand the issue, because I am not a JSON 
>>>>> expert.
>>>>>
>>>>> Is my understanding correct that the serde that you use to read from the 
>>>>> input topic and from the repartition topic also do the validation of the 
>>>>> JSON?
>>>>>
>>>>> Regarding point 2 and 3:
>>>>> I agree that the dependency on the name of the repartition is not a good 
>>>>> idea. The naming of the repartition topic is rather an implementation 
>>>>> detail that should not be leaked. You could improve on it by writing the 
>>>>> array elements to a regular output topic that you name as you want. Then, 
>>>>> you can read the elements again from the output topic and write them into 
>>>>> the table.
>>>>>
>>>>> Regarding point 1:
>>>>> Could you do the validation when you write to the elements to the output 
>>>>> topic?
>>>>>
>>>>> Best,
>>>>> Bruno
>>>>>
>>>>> On 25.02.25 14:20, Paweł Szymczyk wrote:
>>>>>
>>>>> Dear Kafka users,
>>>>>>
>>>>>> The last few days I spent working with Kafka Streams on some tasks which
>>>>>> looked very easy at first glance but finally I struggled with the Streams
>>>>>> Builder API and did something which I am not proud of. Please help me, I 
>>>>>> am
>>>>>> open to any suggestions.
>>>>>> On the input topic we have a message where inside field data we have an
>>>>>> array of particular devices received from 3party company, for example:
>>>>>> {
>>>>>>       "request": {},
>>>>>>       "response": {},
>>>>>>       "data": [{
>>>>>>               "deviceId": "23dc78ffa6c1ad3c038b",
>>>>>>               "event": {
>>>>>>                   "lvl": 2,
>>>>>>                   "someValue": {
>>>>>>                       "horizontal": 9.448819160461426,
>>>>>>                       "vertical": 1.4566929340362549
>>>>>>                   },
>>>>>>                   "timestamp": "2024-04-08T02:06:31.000Z"
>>>>>>               }
>>>>>>           },
>>>>>>           {
>>>>>>               "deviceId": "23dc78ffa",
>>>>>>               "event": {
>>>>>>                   "lvl": 3,
>>>>>>                   "someValue": {
>>>>>>                       "horizontal": 11.149606704711914,
>>>>>>                       "vertical": 7.503936767578125
>>>>>>                   },
>>>>>>                   "consequence": 2,
>>>>>>                   "timestamp": "2024-04-08T02:06:49.000Z"
>>>>>>               }
>>>>>>           }
>>>>>>       ],
>>>>>>       "metadata": {}
>>>>>> }
>>>>>>
>>>>>> Single message can contain tens or even occasionally hundreds of elements
>>>>>> inside data array. This part is given to us by the external company so it
>>>>>> is hard to do anything with that.
>>>>>>
>>>>>> I need to implement a two-stage validation:
>>>>>>
>>>>>>      1. Validate the root structure (request, response, data, and 
>>>>>> metadata
>>>>>>      fields)
>>>>>>      2. Validate each element in the data array individually
>>>>>>
>>>>>> When an individual element from the array fails validation:
>>>>>>
>>>>>>      - Send only that element to a Dead Letter Topic
>>>>>>      - Continue processing other valid elements
>>>>>>
>>>>>> When validation passes:
>>>>>>
>>>>>>      - Send each valid element as a separate Kafka record with a new key 
>>>>>> (
>>>>>>      deviceId) to an output topic
>>>>>>
>>>>>> Now we have two separated, totally not connected json schemas, one for 
>>>>>> root
>>>>>> element second for single item from an array and the code looks like 
>>>>>> that:
>>>>>>
>>>>>> Serde<NonValidatedDevices> nonValidatedDevicesSerde = ...;
>>>>>> Serde<ValidatedDevice> validatedDeviceSerde = ...;
>>>>>>
>>>>>> streamsBuilder.stream(properties.getEquipment().getRawTopicDevices(),
>>>>>> Consumed.with(Serdes.String(), equipmentSerde))
>>>>>>                 .flatMap((nullKey, devices) -> devices.getResponse()
>>>>>>                                                             .getData()
>>>>>>                                                             .getData()
>>>>>>                                                             .stream()
>>>>>>
>>>>>> .map(simpleEquipment -> new
>>>>>> KeyValue<>(simpleEquipment.getEquipmentNumber(), simpleEquipment))
>>>>>>                                                             .toList())
>>>>>>                 .repartition(Repartitioned.<String,
>>>>>> NonValidatedDevices>as("non-validated-repartition").withKeySerde(Serdes.String()).withValueSerde(nonValidatedDevicesSerde));
>>>>>>
>>>>>> KTable<String, ValidatedDevice> device = streamsBuilder.table(
>>>>>>           String.format("%s-%s-repartition",
>>>>>> properties.getEquipment().getApplicationId(), 
>>>>>> "non-validated-repartition"),
>>>>>>           Consumed.with(Serdes.String(), validatedDeviceSerde));
>>>>>>
>>>>>> I would like to higlight two things about that code. When we have a one
>>>>>> schema then single not valid element in data array fails whole event
>>>>>> validation which is not acceptable. Second thing is that we don't like to
>>>>>> introduce manual validation in the flatMap method, we would like to use
>>>>>> automatic json schema for that.
>>>>>>
>>>>>> What we would like to improve:
>>>>>> 1. We're repartitioning all messages (including invalid ones), which is
>>>>>> inefficient.
>>>>>> 2. We have a fragile connection between the repartition method and table
>>>>>> method:
>>>>>> Repartitioned.<String, 
>>>>>> NonValidatedDevices>as("non-validated-repartition")
>>>>>> String.format("%s-%s-repartition",
>>>>>> properties.getEquipment().getApplicationId(), 
>>>>>> "non-validated-repartition")
>>>>>> 3. Kafka Streams internals are unnecessarily exposed in the topic naming
>>>>>> pattern
>>>>>>
>>>>>>
>>>>>> Paweł Szymczyk
>>>>>
>>>>
>>>>
>>>
>>>
>> Paweł Szymczyk
>


-- 
Pozdrawiam
Paweł Szymczyk

Reply via email to