Sorry for that, I should be more precise in that from the beginning. By 
automatic schema validation I mean SerDe combined with Default Deserialisation 
Exception Handler along with dlq topic as you can find in official streams 
documentation 
<https://kafka.apache.org/23/documentation/streams/developer-guide/config-streams.html#default-deserialization-exception-handler>.

Dnia 25 lutego 2025 20:47:36 CET, Bruno Cadonna <cado...@apache.org> napisał/a:
>Hi Pawel,
>
>What is the "automatic json schema validation feature"?
>Streams does not have such a thing built-in.
>
>Do you have an example of a third-party software that does that validation?
>
>Regarding decomposing your input records into the smaller array elements, that 
>should work with
>
>builder.stream(...).flatMap(...).to(...)
>
>This should not create any repartition or changelog topics.
>
>Have you tried that?
>
>The question is where to put the validation of the array elements. That 
>depends also from what you mean with "automatic json schema validation 
>feature".
>
>Best,
>Bruno
>
>
>
>
>On 25.02.25 17:22, Paweł Szymczyk wrote:
>> To make it clear I will give you an example:
>> 
>> On the source topic we have:
>> offset: 0
>> key: null
>> value:
>> {
>>     "request":{
>> 
>>     },
>>     "response":{
>> 
>>     },
>>     "data":[
>>        {
>>           "deviceId":"23dc78ffa6c1ad3c038b",
>>           "event":{
>>              "lvl":2,
>>              "someValue":{
>>                 "horizontal":9.448819160461426,
>>                 "vertical":1.4566929340362549
>>              },
>>              "timestamp":"2024-04-08T02:06:31.000Z"
>>           }
>>        },
>>        {
>>           "deviceId":"23dc78ffa",
>>           "event":{
>>              "lvl":3,
>>              "someValue":{
>>                 "horizontal":11.149606704711914,
>>                 "vertical":7.503936767578125
>>              },
>>              "consequence":2,
>>              "timestamp":"2024-04-08T02:06:49.000Z"
>>           }
>>        },
>>        {
>>           "some wrong data":{
>>              "xx":3,
>>              "xxx":{
>>                 "xx":11.149606704711914,
>>                 "yyy":7.503936767578125
>>              }
>>           }
>>        }
>>     ],
>>     "metadata":{
>> 
>>     }
>> }
>> 
>> On the sink topic we would like to have:
>> offset: 0
>> key: "23dc78ffa6c1ad3c038b"
>> value:
>> {
>>     "deviceId":"23dc78ffa6c1ad3c038b",
>>     "event":{
>>        "lvl":2,
>>        "someValue":{
>>           "horizontal":9.448819160461426,
>>           "vertical":1.4566929340362549
>>        },
>>        "timestamp":"2024-04-08T02:06:31.000Z"
>>     }
>> }
>> 
>> offset: 1
>> key: " 23dc78ffa"
>> value:
>> {
>>     "deviceId":"23dc78ffa",
>>     "event":{
>>        "lvl":3,
>>        "someValue":{
>>           "horizontal":11.149606704711914,
>>           "vertical":7.503936767578125
>>        },
>>        "consequence":2,
>>        "timestamp":"2024-04-08T02:06:49.000Z"
>>     }
>> }
>> 
>> And perfect soultion should use only two topics source and sink (no
>> additional repartition and changelog), and as I mentioned before we don't
>> like to introduce manual validation in the flatMap (like if null
>> statements) method,
>> we would like to use automatic json schema validation feature for that.
>> 
>> wt., 25 lut 2025 o 17:02 Paweł Szymczyk <pawel.szymczy...@gmail.com>
>> napisał(a):
>> 
>>> Ideal solution has two topics, can we somehow do flatMap and change a key
>>> without changing creating the internal repartition topic? This will allow
>>> us to skip using the internal repartition topic as source for k table.
>>> 
>>> 
>>> Dnia 25 lutego 2025 15:41:19 CET, Bruno Cadonna <cado...@apache.org>
>>> napisał/a:
>>> 
>>>> Hi Pawel,
>>>> 
>>>> I am not completely sure I understand the issue, because I am not a JSON 
>>>> expert.
>>>> 
>>>> Is my understanding correct that the serde that you use to read from the 
>>>> input topic and from the repartition topic also do the validation of the 
>>>> JSON?
>>>> 
>>>> Regarding point 2 and 3:
>>>> I agree that the dependency on the name of the repartition is not a good 
>>>> idea. The naming of the repartition topic is rather an implementation 
>>>> detail that should not be leaked. You could improve on it by writing the 
>>>> array elements to a regular output topic that you name as you want. Then, 
>>>> you can read the elements again from the output topic and write them into 
>>>> the table.
>>>> 
>>>> Regarding point 1:
>>>> Could you do the validation when you write to the elements to the output 
>>>> topic?
>>>> 
>>>> Best,
>>>> Bruno
>>>> 
>>>> On 25.02.25 14:20, Paweł Szymczyk wrote:
>>>> 
>>>>> Dear Kafka users,
>>>>> 
>>>>> The last few days I spent working with Kafka Streams on some tasks which
>>>>> looked very easy at first glance but finally I struggled with the Streams
>>>>> Builder API and did something which I am not proud of. Please help me, I 
>>>>> am
>>>>> open to any suggestions.
>>>>> On the input topic we have a message where inside field data we have an
>>>>> array of particular devices received from 3party company, for example:
>>>>> {
>>>>>       "request": {},
>>>>>       "response": {},
>>>>>       "data": [{
>>>>>               "deviceId": "23dc78ffa6c1ad3c038b",
>>>>>               "event": {
>>>>>                   "lvl": 2,
>>>>>                   "someValue": {
>>>>>                       "horizontal": 9.448819160461426,
>>>>>                       "vertical": 1.4566929340362549
>>>>>                   },
>>>>>                   "timestamp": "2024-04-08T02:06:31.000Z"
>>>>>               }
>>>>>           },
>>>>>           {
>>>>>               "deviceId": "23dc78ffa",
>>>>>               "event": {
>>>>>                   "lvl": 3,
>>>>>                   "someValue": {
>>>>>                       "horizontal": 11.149606704711914,
>>>>>                       "vertical": 7.503936767578125
>>>>>                   },
>>>>>                   "consequence": 2,
>>>>>                   "timestamp": "2024-04-08T02:06:49.000Z"
>>>>>               }
>>>>>           }
>>>>>       ],
>>>>>       "metadata": {}
>>>>> }
>>>>> 
>>>>> Single message can contain tens or even occasionally hundreds of elements
>>>>> inside data array. This part is given to us by the external company so it
>>>>> is hard to do anything with that.
>>>>> 
>>>>> I need to implement a two-stage validation:
>>>>> 
>>>>>      1. Validate the root structure (request, response, data, and metadata
>>>>>      fields)
>>>>>      2. Validate each element in the data array individually
>>>>> 
>>>>> When an individual element from the array fails validation:
>>>>> 
>>>>>      - Send only that element to a Dead Letter Topic
>>>>>      - Continue processing other valid elements
>>>>> 
>>>>> When validation passes:
>>>>> 
>>>>>      - Send each valid element as a separate Kafka record with a new key (
>>>>>      deviceId) to an output topic
>>>>> 
>>>>> Now we have two separated, totally not connected json schemas, one for 
>>>>> root
>>>>> element second for single item from an array and the code looks like that:
>>>>> 
>>>>> Serde<NonValidatedDevices> nonValidatedDevicesSerde = ...;
>>>>> Serde<ValidatedDevice> validatedDeviceSerde = ...;
>>>>> 
>>>>> streamsBuilder.stream(properties.getEquipment().getRawTopicDevices(),
>>>>> Consumed.with(Serdes.String(), equipmentSerde))
>>>>>                 .flatMap((nullKey, devices) -> devices.getResponse()
>>>>>                                                             .getData()
>>>>>                                                             .getData()
>>>>>                                                             .stream()
>>>>> 
>>>>> .map(simpleEquipment -> new
>>>>> KeyValue<>(simpleEquipment.getEquipmentNumber(), simpleEquipment))
>>>>>                                                             .toList())
>>>>>                 .repartition(Repartitioned.<String,
>>>>> NonValidatedDevices>as("non-validated-repartition").withKeySerde(Serdes.String()).withValueSerde(nonValidatedDevicesSerde));
>>>>> 
>>>>> KTable<String, ValidatedDevice> device = streamsBuilder.table(
>>>>>           String.format("%s-%s-repartition",
>>>>> properties.getEquipment().getApplicationId(), 
>>>>> "non-validated-repartition"),
>>>>>           Consumed.with(Serdes.String(), validatedDeviceSerde));
>>>>> 
>>>>> I would like to higlight two things about that code. When we have a one
>>>>> schema then single not valid element in data array fails whole event
>>>>> validation which is not acceptable. Second thing is that we don't like to
>>>>> introduce manual validation in the flatMap method, we would like to use
>>>>> automatic json schema for that.
>>>>> 
>>>>> What we would like to improve:
>>>>> 1. We're repartitioning all messages (including invalid ones), which is
>>>>> inefficient.
>>>>> 2. We have a fragile connection between the repartition method and table
>>>>> method:
>>>>> Repartitioned.<String, NonValidatedDevices>as("non-validated-repartition")
>>>>> String.format("%s-%s-repartition",
>>>>> properties.getEquipment().getApplicationId(), "non-validated-repartition")
>>>>> 3. Kafka Streams internals are unnecessarily exposed in the topic naming
>>>>> pattern
>>>>> 
>>>>> 
>>>> Paweł Szymczyk
>>> 
>> 
>> 
>

Paweł Szymczyk 

Reply via email to