Hi Bruno, Many thanks for trying to understand the problem, I prepare some repository which has simple demo https://github.com/pszymczyk/streams-flatmap. You can run /src/main/javaMain.main and then /src/test/java/SimpleTest, as the result you will have 6 messages on custom-repartition topic and 3 messages on validated-devices.
After writing that code I realized that what I actually like to have for that case is sth like that (not compiling now): streamsBuilder.stream("raw-devices", Consumed.with(Serdes.String(), SerDeFactory.nonValidatedDevicesSerde())) .flatMap((key1, value1) -> value1.devices .stream() .map(nonValidatedDevice -> new KeyValue<>(nonValidatedDevice.deviceId, nonValidatedDevice)) .toList()) .to("validated-devices", Produced.with(Serdes.String(), SerDeFactory.validatedDeviceSerde())); which means that I like to change the sink topic Serde on the fly without mapping the objects manually. śr., 26 lut 2025 o 07:46 Paweł Szymczyk <pawel.szymczy...@gmail.com> napisał(a): > I will provide you with the sample source code on GitHub today. > > > Dnia 25 lutego 2025 20:47:36 CET, Bruno Cadonna <cado...@apache.org> > napisał/a: > >> Hi Pawel, >> >> What is the "automatic json schema validation feature"? >> Streams does not have such a thing built-in. >> >> Do you have an example of a third-party software that does that validation? >> >> Regarding decomposing your input records into the smaller array elements, >> that should work with >> >> builder.stream(...).flatMap(...).to(...) >> >> This should not create any repartition or changelog topics. >> >> Have you tried that? >> >> The question is where to put the validation of the array elements. That >> depends also from what you mean with "automatic json schema validation >> feature". >> >> Best, >> Bruno >> >> >> >> >> On 25.02.25 17:22, Paweł Szymczyk wrote: >> >>> To make it clear I will give you an example: >>> >>> On the source topic we have: >>> offset: 0 >>> key: null >>> value: >>> { >>> "request":{ >>> >>> }, >>> "response":{ >>> >>> }, >>> "data":[ >>> { >>> "deviceId":"23dc78ffa6c1ad3c038b", >>> "event":{ >>> "lvl":2, >>> "someValue":{ >>> "horizontal":9.448819160461426, >>> "vertical":1.4566929340362549 >>> }, >>> "timestamp":"2024-04-08T02:06:31.000Z" >>> } >>> }, >>> { >>> "deviceId":"23dc78ffa", >>> "event":{ >>> "lvl":3, >>> "someValue":{ >>> "horizontal":11.149606704711914, >>> "vertical":7.503936767578125 >>> }, >>> "consequence":2, >>> "timestamp":"2024-04-08T02:06:49.000Z" >>> } >>> }, >>> { >>> "some wrong data":{ >>> "xx":3, >>> "xxx":{ >>> "xx":11.149606704711914, >>> "yyy":7.503936767578125 >>> } >>> } >>> } >>> ], >>> "metadata":{ >>> >>> } >>> } >>> >>> On the sink topic we would like to have: >>> offset: 0 >>> key: "23dc78ffa6c1ad3c038b" >>> value: >>> { >>> "deviceId":"23dc78ffa6c1ad3c038b", >>> "event":{ >>> "lvl":2, >>> "someValue":{ >>> "horizontal":9.448819160461426, >>> "vertical":1.4566929340362549 >>> }, >>> "timestamp":"2024-04-08T02:06:31.000Z" >>> } >>> } >>> >>> offset: 1 >>> key: " 23dc78ffa" >>> value: >>> { >>> "deviceId":"23dc78ffa", >>> "event":{ >>> "lvl":3, >>> "someValue":{ >>> "horizontal":11.149606704711914, >>> "vertical":7.503936767578125 >>> }, >>> "consequence":2, >>> "timestamp":"2024-04-08T02:06:49.000Z" >>> } >>> } >>> >>> And perfect soultion should use only two topics source and sink (no >>> additional repartition and changelog), and as I mentioned before we don't >>> like to introduce manual validation in the flatMap (like if null >>> statements) method, >>> we would like to use automatic json schema validation feature for that. >>> >>> wt., 25 lut 2025 o 17:02 Paweł Szymczyk <pawel.szymczy...@gmail.com> >>> napisał(a): >>> >>> Ideal solution has two topics, can we somehow do flatMap and change a key >>>> without changing creating the internal repartition topic? This will allow >>>> us to skip using the internal repartition topic as source for k table. >>>> >>>> >>>> Dnia 25 lutego 2025 15:41:19 CET, Bruno Cadonna <cado...@apache.org> >>>> napisał/a: >>>> >>>> Hi Pawel, >>>>> >>>>> I am not completely sure I understand the issue, because I am not a JSON >>>>> expert. >>>>> >>>>> Is my understanding correct that the serde that you use to read from the >>>>> input topic and from the repartition topic also do the validation of the >>>>> JSON? >>>>> >>>>> Regarding point 2 and 3: >>>>> I agree that the dependency on the name of the repartition is not a good >>>>> idea. The naming of the repartition topic is rather an implementation >>>>> detail that should not be leaked. You could improve on it by writing the >>>>> array elements to a regular output topic that you name as you want. Then, >>>>> you can read the elements again from the output topic and write them into >>>>> the table. >>>>> >>>>> Regarding point 1: >>>>> Could you do the validation when you write to the elements to the output >>>>> topic? >>>>> >>>>> Best, >>>>> Bruno >>>>> >>>>> On 25.02.25 14:20, Paweł Szymczyk wrote: >>>>> >>>>> Dear Kafka users, >>>>>> >>>>>> The last few days I spent working with Kafka Streams on some tasks which >>>>>> looked very easy at first glance but finally I struggled with the Streams >>>>>> Builder API and did something which I am not proud of. Please help me, I >>>>>> am >>>>>> open to any suggestions. >>>>>> On the input topic we have a message where inside field data we have an >>>>>> array of particular devices received from 3party company, for example: >>>>>> { >>>>>> "request": {}, >>>>>> "response": {}, >>>>>> "data": [{ >>>>>> "deviceId": "23dc78ffa6c1ad3c038b", >>>>>> "event": { >>>>>> "lvl": 2, >>>>>> "someValue": { >>>>>> "horizontal": 9.448819160461426, >>>>>> "vertical": 1.4566929340362549 >>>>>> }, >>>>>> "timestamp": "2024-04-08T02:06:31.000Z" >>>>>> } >>>>>> }, >>>>>> { >>>>>> "deviceId": "23dc78ffa", >>>>>> "event": { >>>>>> "lvl": 3, >>>>>> "someValue": { >>>>>> "horizontal": 11.149606704711914, >>>>>> "vertical": 7.503936767578125 >>>>>> }, >>>>>> "consequence": 2, >>>>>> "timestamp": "2024-04-08T02:06:49.000Z" >>>>>> } >>>>>> } >>>>>> ], >>>>>> "metadata": {} >>>>>> } >>>>>> >>>>>> Single message can contain tens or even occasionally hundreds of elements >>>>>> inside data array. This part is given to us by the external company so it >>>>>> is hard to do anything with that. >>>>>> >>>>>> I need to implement a two-stage validation: >>>>>> >>>>>> 1. Validate the root structure (request, response, data, and >>>>>> metadata >>>>>> fields) >>>>>> 2. Validate each element in the data array individually >>>>>> >>>>>> When an individual element from the array fails validation: >>>>>> >>>>>> - Send only that element to a Dead Letter Topic >>>>>> - Continue processing other valid elements >>>>>> >>>>>> When validation passes: >>>>>> >>>>>> - Send each valid element as a separate Kafka record with a new key >>>>>> ( >>>>>> deviceId) to an output topic >>>>>> >>>>>> Now we have two separated, totally not connected json schemas, one for >>>>>> root >>>>>> element second for single item from an array and the code looks like >>>>>> that: >>>>>> >>>>>> Serde<NonValidatedDevices> nonValidatedDevicesSerde = ...; >>>>>> Serde<ValidatedDevice> validatedDeviceSerde = ...; >>>>>> >>>>>> streamsBuilder.stream(properties.getEquipment().getRawTopicDevices(), >>>>>> Consumed.with(Serdes.String(), equipmentSerde)) >>>>>> .flatMap((nullKey, devices) -> devices.getResponse() >>>>>> .getData() >>>>>> .getData() >>>>>> .stream() >>>>>> >>>>>> .map(simpleEquipment -> new >>>>>> KeyValue<>(simpleEquipment.getEquipmentNumber(), simpleEquipment)) >>>>>> .toList()) >>>>>> .repartition(Repartitioned.<String, >>>>>> NonValidatedDevices>as("non-validated-repartition").withKeySerde(Serdes.String()).withValueSerde(nonValidatedDevicesSerde)); >>>>>> >>>>>> KTable<String, ValidatedDevice> device = streamsBuilder.table( >>>>>> String.format("%s-%s-repartition", >>>>>> properties.getEquipment().getApplicationId(), >>>>>> "non-validated-repartition"), >>>>>> Consumed.with(Serdes.String(), validatedDeviceSerde)); >>>>>> >>>>>> I would like to higlight two things about that code. When we have a one >>>>>> schema then single not valid element in data array fails whole event >>>>>> validation which is not acceptable. Second thing is that we don't like to >>>>>> introduce manual validation in the flatMap method, we would like to use >>>>>> automatic json schema for that. >>>>>> >>>>>> What we would like to improve: >>>>>> 1. We're repartitioning all messages (including invalid ones), which is >>>>>> inefficient. >>>>>> 2. We have a fragile connection between the repartition method and table >>>>>> method: >>>>>> Repartitioned.<String, >>>>>> NonValidatedDevices>as("non-validated-repartition") >>>>>> String.format("%s-%s-repartition", >>>>>> properties.getEquipment().getApplicationId(), >>>>>> "non-validated-repartition") >>>>>> 3. Kafka Streams internals are unnecessarily exposed in the topic naming >>>>>> pattern >>>>>> >>>>>> >>>>>> Paweł Szymczyk >>>>> >>>> >>>> >>> >>> >> Paweł Szymczyk > -- Pozdrawiam Paweł Szymczyk