Sorry for that, I should be more precise in that from the beginning. By automatic schema validation I mean SerDe combined with Default Deserialisation Exception Handler along with dlq topic as you can find in official streams documentation <https://kafka.apache.org/23/documentation/streams/developer-guide/config-streams.html#default-deserialization-exception-handler>.
Dnia 25 lutego 2025 20:47:36 CET, Bruno Cadonna <cado...@apache.org> napisał/a: >Hi Pawel, > >What is the "automatic json schema validation feature"? >Streams does not have such a thing built-in. > >Do you have an example of a third-party software that does that validation? > >Regarding decomposing your input records into the smaller array elements, that >should work with > >builder.stream(...).flatMap(...).to(...) > >This should not create any repartition or changelog topics. > >Have you tried that? > >The question is where to put the validation of the array elements. That >depends also from what you mean with "automatic json schema validation >feature". > >Best, >Bruno > > > > >On 25.02.25 17:22, Paweł Szymczyk wrote: >> To make it clear I will give you an example: >> >> On the source topic we have: >> offset: 0 >> key: null >> value: >> { >> "request":{ >> >> }, >> "response":{ >> >> }, >> "data":[ >> { >> "deviceId":"23dc78ffa6c1ad3c038b", >> "event":{ >> "lvl":2, >> "someValue":{ >> "horizontal":9.448819160461426, >> "vertical":1.4566929340362549 >> }, >> "timestamp":"2024-04-08T02:06:31.000Z" >> } >> }, >> { >> "deviceId":"23dc78ffa", >> "event":{ >> "lvl":3, >> "someValue":{ >> "horizontal":11.149606704711914, >> "vertical":7.503936767578125 >> }, >> "consequence":2, >> "timestamp":"2024-04-08T02:06:49.000Z" >> } >> }, >> { >> "some wrong data":{ >> "xx":3, >> "xxx":{ >> "xx":11.149606704711914, >> "yyy":7.503936767578125 >> } >> } >> } >> ], >> "metadata":{ >> >> } >> } >> >> On the sink topic we would like to have: >> offset: 0 >> key: "23dc78ffa6c1ad3c038b" >> value: >> { >> "deviceId":"23dc78ffa6c1ad3c038b", >> "event":{ >> "lvl":2, >> "someValue":{ >> "horizontal":9.448819160461426, >> "vertical":1.4566929340362549 >> }, >> "timestamp":"2024-04-08T02:06:31.000Z" >> } >> } >> >> offset: 1 >> key: " 23dc78ffa" >> value: >> { >> "deviceId":"23dc78ffa", >> "event":{ >> "lvl":3, >> "someValue":{ >> "horizontal":11.149606704711914, >> "vertical":7.503936767578125 >> }, >> "consequence":2, >> "timestamp":"2024-04-08T02:06:49.000Z" >> } >> } >> >> And perfect soultion should use only two topics source and sink (no >> additional repartition and changelog), and as I mentioned before we don't >> like to introduce manual validation in the flatMap (like if null >> statements) method, >> we would like to use automatic json schema validation feature for that. >> >> wt., 25 lut 2025 o 17:02 Paweł Szymczyk <pawel.szymczy...@gmail.com> >> napisał(a): >> >>> Ideal solution has two topics, can we somehow do flatMap and change a key >>> without changing creating the internal repartition topic? This will allow >>> us to skip using the internal repartition topic as source for k table. >>> >>> >>> Dnia 25 lutego 2025 15:41:19 CET, Bruno Cadonna <cado...@apache.org> >>> napisał/a: >>> >>>> Hi Pawel, >>>> >>>> I am not completely sure I understand the issue, because I am not a JSON >>>> expert. >>>> >>>> Is my understanding correct that the serde that you use to read from the >>>> input topic and from the repartition topic also do the validation of the >>>> JSON? >>>> >>>> Regarding point 2 and 3: >>>> I agree that the dependency on the name of the repartition is not a good >>>> idea. The naming of the repartition topic is rather an implementation >>>> detail that should not be leaked. You could improve on it by writing the >>>> array elements to a regular output topic that you name as you want. Then, >>>> you can read the elements again from the output topic and write them into >>>> the table. >>>> >>>> Regarding point 1: >>>> Could you do the validation when you write to the elements to the output >>>> topic? >>>> >>>> Best, >>>> Bruno >>>> >>>> On 25.02.25 14:20, Paweł Szymczyk wrote: >>>> >>>>> Dear Kafka users, >>>>> >>>>> The last few days I spent working with Kafka Streams on some tasks which >>>>> looked very easy at first glance but finally I struggled with the Streams >>>>> Builder API and did something which I am not proud of. Please help me, I >>>>> am >>>>> open to any suggestions. >>>>> On the input topic we have a message where inside field data we have an >>>>> array of particular devices received from 3party company, for example: >>>>> { >>>>> "request": {}, >>>>> "response": {}, >>>>> "data": [{ >>>>> "deviceId": "23dc78ffa6c1ad3c038b", >>>>> "event": { >>>>> "lvl": 2, >>>>> "someValue": { >>>>> "horizontal": 9.448819160461426, >>>>> "vertical": 1.4566929340362549 >>>>> }, >>>>> "timestamp": "2024-04-08T02:06:31.000Z" >>>>> } >>>>> }, >>>>> { >>>>> "deviceId": "23dc78ffa", >>>>> "event": { >>>>> "lvl": 3, >>>>> "someValue": { >>>>> "horizontal": 11.149606704711914, >>>>> "vertical": 7.503936767578125 >>>>> }, >>>>> "consequence": 2, >>>>> "timestamp": "2024-04-08T02:06:49.000Z" >>>>> } >>>>> } >>>>> ], >>>>> "metadata": {} >>>>> } >>>>> >>>>> Single message can contain tens or even occasionally hundreds of elements >>>>> inside data array. This part is given to us by the external company so it >>>>> is hard to do anything with that. >>>>> >>>>> I need to implement a two-stage validation: >>>>> >>>>> 1. Validate the root structure (request, response, data, and metadata >>>>> fields) >>>>> 2. Validate each element in the data array individually >>>>> >>>>> When an individual element from the array fails validation: >>>>> >>>>> - Send only that element to a Dead Letter Topic >>>>> - Continue processing other valid elements >>>>> >>>>> When validation passes: >>>>> >>>>> - Send each valid element as a separate Kafka record with a new key ( >>>>> deviceId) to an output topic >>>>> >>>>> Now we have two separated, totally not connected json schemas, one for >>>>> root >>>>> element second for single item from an array and the code looks like that: >>>>> >>>>> Serde<NonValidatedDevices> nonValidatedDevicesSerde = ...; >>>>> Serde<ValidatedDevice> validatedDeviceSerde = ...; >>>>> >>>>> streamsBuilder.stream(properties.getEquipment().getRawTopicDevices(), >>>>> Consumed.with(Serdes.String(), equipmentSerde)) >>>>> .flatMap((nullKey, devices) -> devices.getResponse() >>>>> .getData() >>>>> .getData() >>>>> .stream() >>>>> >>>>> .map(simpleEquipment -> new >>>>> KeyValue<>(simpleEquipment.getEquipmentNumber(), simpleEquipment)) >>>>> .toList()) >>>>> .repartition(Repartitioned.<String, >>>>> NonValidatedDevices>as("non-validated-repartition").withKeySerde(Serdes.String()).withValueSerde(nonValidatedDevicesSerde)); >>>>> >>>>> KTable<String, ValidatedDevice> device = streamsBuilder.table( >>>>> String.format("%s-%s-repartition", >>>>> properties.getEquipment().getApplicationId(), >>>>> "non-validated-repartition"), >>>>> Consumed.with(Serdes.String(), validatedDeviceSerde)); >>>>> >>>>> I would like to higlight two things about that code. When we have a one >>>>> schema then single not valid element in data array fails whole event >>>>> validation which is not acceptable. Second thing is that we don't like to >>>>> introduce manual validation in the flatMap method, we would like to use >>>>> automatic json schema for that. >>>>> >>>>> What we would like to improve: >>>>> 1. We're repartitioning all messages (including invalid ones), which is >>>>> inefficient. >>>>> 2. We have a fragile connection between the repartition method and table >>>>> method: >>>>> Repartitioned.<String, NonValidatedDevices>as("non-validated-repartition") >>>>> String.format("%s-%s-repartition", >>>>> properties.getEquipment().getApplicationId(), "non-validated-repartition") >>>>> 3. Kafka Streams internals are unnecessarily exposed in the topic naming >>>>> pattern >>>>> >>>>> >>>> Paweł Szymczyk >>> >> >> > Paweł Szymczyk