Ideal solution has two topics, can we somehow do flatMap and change a key without changing creating the internal repartition topic? This will allow us to skip using the internal repartition topic as source for k table.
Dnia 25 lutego 2025 15:41:19 CET, Bruno Cadonna <cado...@apache.org> napisał/a: >Hi Pawel, > >I am not completely sure I understand the issue, because I am not a JSON >expert. > >Is my understanding correct that the serde that you use to read from the input >topic and from the repartition topic also do the validation of the JSON? > >Regarding point 2 and 3: >I agree that the dependency on the name of the repartition is not a good idea. >The naming of the repartition topic is rather an implementation detail that >should not be leaked. You could improve on it by writing the array elements to >a regular output topic that you name as you want. Then, you can read the >elements again from the output topic and write them into the table. > >Regarding point 1: >Could you do the validation when you write to the elements to the output topic? > >Best, >Bruno > >On 25.02.25 14:20, Paweł Szymczyk wrote: >> Dear Kafka users, >> >> The last few days I spent working with Kafka Streams on some tasks which >> looked very easy at first glance but finally I struggled with the Streams >> Builder API and did something which I am not proud of. Please help me, I am >> open to any suggestions. >> On the input topic we have a message where inside field data we have an >> array of particular devices received from 3party company, for example: >> { >> "request": {}, >> "response": {}, >> "data": [{ >> "deviceId": "23dc78ffa6c1ad3c038b", >> "event": { >> "lvl": 2, >> "someValue": { >> "horizontal": 9.448819160461426, >> "vertical": 1.4566929340362549 >> }, >> "timestamp": "2024-04-08T02:06:31.000Z" >> } >> }, >> { >> "deviceId": "23dc78ffa", >> "event": { >> "lvl": 3, >> "someValue": { >> "horizontal": 11.149606704711914, >> "vertical": 7.503936767578125 >> }, >> "consequence": 2, >> "timestamp": "2024-04-08T02:06:49.000Z" >> } >> } >> ], >> "metadata": {} >> } >> >> Single message can contain tens or even occasionally hundreds of elements >> inside data array. This part is given to us by the external company so it >> is hard to do anything with that. >> >> I need to implement a two-stage validation: >> >> 1. Validate the root structure (request, response, data, and metadata >> fields) >> 2. Validate each element in the data array individually >> >> When an individual element from the array fails validation: >> >> - Send only that element to a Dead Letter Topic >> - Continue processing other valid elements >> >> When validation passes: >> >> - Send each valid element as a separate Kafka record with a new key ( >> deviceId) to an output topic >> >> Now we have two separated, totally not connected json schemas, one for root >> element second for single item from an array and the code looks like that: >> >> Serde<NonValidatedDevices> nonValidatedDevicesSerde = ...; >> Serde<ValidatedDevice> validatedDeviceSerde = ...; >> >> streamsBuilder.stream(properties.getEquipment().getRawTopicDevices(), >> Consumed.with(Serdes.String(), equipmentSerde)) >> .flatMap((nullKey, devices) -> devices.getResponse() >> .getData() >> .getData() >> .stream() >> >> .map(simpleEquipment -> new >> KeyValue<>(simpleEquipment.getEquipmentNumber(), simpleEquipment)) >> .toList()) >> .repartition(Repartitioned.<String, >> NonValidatedDevices>as("non-validated-repartition").withKeySerde(Serdes.String()).withValueSerde(nonValidatedDevicesSerde)); >> >> KTable<String, ValidatedDevice> device = streamsBuilder.table( >> String.format("%s-%s-repartition", >> properties.getEquipment().getApplicationId(), "non-validated-repartition"), >> Consumed.with(Serdes.String(), validatedDeviceSerde)); >> >> I would like to higlight two things about that code. When we have a one >> schema then single not valid element in data array fails whole event >> validation which is not acceptable. Second thing is that we don't like to >> introduce manual validation in the flatMap method, we would like to use >> automatic json schema for that. >> >> What we would like to improve: >> 1. We're repartitioning all messages (including invalid ones), which is >> inefficient. >> 2. We have a fragile connection between the repartition method and table >> method: >> Repartitioned.<String, NonValidatedDevices>as("non-validated-repartition") >> String.format("%s-%s-repartition", >> properties.getEquipment().getApplicationId(), "non-validated-repartition") >> 3. Kafka Streams internals are unnecessarily exposed in the topic naming >> pattern >> > Paweł Szymczyk