Hi Bruno,
Many thanks for trying to understand the problem, I prepare some repository
which has simple demo https://github.com/pszymczyk/streams-flatmap. You can
run /src/main/javaMain.main and then /src/test/java/SimpleTest, as the
result you will have 6 messages on custom-repartition topic and 3 messages
on validated-devices.
After writing that code I realized that what I actually like to have for
that case is sth like that (not compiling now):
streamsBuilder.stream("raw-devices", Consumed.with(Serdes.String(),
SerDeFactory.nonValidatedDevicesSerde()))
.flatMap((key1, value1) -> value1.devices
.stream()
.map(nonValidatedDevice -> new
KeyValue<>(nonValidatedDevice.deviceId, nonValidatedDevice))
.toList())
.to("validated-devices", Produced.with(Serdes.String(),
SerDeFactory.validatedDeviceSerde()));
which means that I like to change the sink topic Serde on the fly without
mapping the objects manually.
śr., 26 lut 2025 o 07:46 Paweł Szymczyk <[email protected]>
napisał(a):
> I will provide you with the sample source code on GitHub today.
>
>
> Dnia 25 lutego 2025 20:47:36 CET, Bruno Cadonna <[email protected]>
> napisał/a:
>
>> Hi Pawel,
>>
>> What is the "automatic json schema validation feature"?
>> Streams does not have such a thing built-in.
>>
>> Do you have an example of a third-party software that does that validation?
>>
>> Regarding decomposing your input records into the smaller array elements,
>> that should work with
>>
>> builder.stream(...).flatMap(...).to(...)
>>
>> This should not create any repartition or changelog topics.
>>
>> Have you tried that?
>>
>> The question is where to put the validation of the array elements. That
>> depends also from what you mean with "automatic json schema validation
>> feature".
>>
>> Best,
>> Bruno
>>
>>
>>
>>
>> On 25.02.25 17:22, Paweł Szymczyk wrote:
>>
>>> To make it clear I will give you an example:
>>>
>>> On the source topic we have:
>>> offset: 0
>>> key: null
>>> value:
>>> {
>>> "request":{
>>>
>>> },
>>> "response":{
>>>
>>> },
>>> "data":[
>>> {
>>> "deviceId":"23dc78ffa6c1ad3c038b",
>>> "event":{
>>> "lvl":2,
>>> "someValue":{
>>> "horizontal":9.448819160461426,
>>> "vertical":1.4566929340362549
>>> },
>>> "timestamp":"2024-04-08T02:06:31.000Z"
>>> }
>>> },
>>> {
>>> "deviceId":"23dc78ffa",
>>> "event":{
>>> "lvl":3,
>>> "someValue":{
>>> "horizontal":11.149606704711914,
>>> "vertical":7.503936767578125
>>> },
>>> "consequence":2,
>>> "timestamp":"2024-04-08T02:06:49.000Z"
>>> }
>>> },
>>> {
>>> "some wrong data":{
>>> "xx":3,
>>> "xxx":{
>>> "xx":11.149606704711914,
>>> "yyy":7.503936767578125
>>> }
>>> }
>>> }
>>> ],
>>> "metadata":{
>>>
>>> }
>>> }
>>>
>>> On the sink topic we would like to have:
>>> offset: 0
>>> key: "23dc78ffa6c1ad3c038b"
>>> value:
>>> {
>>> "deviceId":"23dc78ffa6c1ad3c038b",
>>> "event":{
>>> "lvl":2,
>>> "someValue":{
>>> "horizontal":9.448819160461426,
>>> "vertical":1.4566929340362549
>>> },
>>> "timestamp":"2024-04-08T02:06:31.000Z"
>>> }
>>> }
>>>
>>> offset: 1
>>> key: " 23dc78ffa"
>>> value:
>>> {
>>> "deviceId":"23dc78ffa",
>>> "event":{
>>> "lvl":3,
>>> "someValue":{
>>> "horizontal":11.149606704711914,
>>> "vertical":7.503936767578125
>>> },
>>> "consequence":2,
>>> "timestamp":"2024-04-08T02:06:49.000Z"
>>> }
>>> }
>>>
>>> And perfect soultion should use only two topics source and sink (no
>>> additional repartition and changelog), and as I mentioned before we don't
>>> like to introduce manual validation in the flatMap (like if null
>>> statements) method,
>>> we would like to use automatic json schema validation feature for that.
>>>
>>> wt., 25 lut 2025 o 17:02 Paweł Szymczyk <[email protected]>
>>> napisał(a):
>>>
>>> Ideal solution has two topics, can we somehow do flatMap and change a key
>>>> without changing creating the internal repartition topic? This will allow
>>>> us to skip using the internal repartition topic as source for k table.
>>>>
>>>>
>>>> Dnia 25 lutego 2025 15:41:19 CET, Bruno Cadonna <[email protected]>
>>>> napisał/a:
>>>>
>>>> Hi Pawel,
>>>>>
>>>>> I am not completely sure I understand the issue, because I am not a JSON
>>>>> expert.
>>>>>
>>>>> Is my understanding correct that the serde that you use to read from the
>>>>> input topic and from the repartition topic also do the validation of the
>>>>> JSON?
>>>>>
>>>>> Regarding point 2 and 3:
>>>>> I agree that the dependency on the name of the repartition is not a good
>>>>> idea. The naming of the repartition topic is rather an implementation
>>>>> detail that should not be leaked. You could improve on it by writing the
>>>>> array elements to a regular output topic that you name as you want. Then,
>>>>> you can read the elements again from the output topic and write them into
>>>>> the table.
>>>>>
>>>>> Regarding point 1:
>>>>> Could you do the validation when you write to the elements to the output
>>>>> topic?
>>>>>
>>>>> Best,
>>>>> Bruno
>>>>>
>>>>> On 25.02.25 14:20, Paweł Szymczyk wrote:
>>>>>
>>>>> Dear Kafka users,
>>>>>>
>>>>>> The last few days I spent working with Kafka Streams on some tasks which
>>>>>> looked very easy at first glance but finally I struggled with the Streams
>>>>>> Builder API and did something which I am not proud of. Please help me, I
>>>>>> am
>>>>>> open to any suggestions.
>>>>>> On the input topic we have a message where inside field data we have an
>>>>>> array of particular devices received from 3party company, for example:
>>>>>> {
>>>>>> "request": {},
>>>>>> "response": {},
>>>>>> "data": [{
>>>>>> "deviceId": "23dc78ffa6c1ad3c038b",
>>>>>> "event": {
>>>>>> "lvl": 2,
>>>>>> "someValue": {
>>>>>> "horizontal": 9.448819160461426,
>>>>>> "vertical": 1.4566929340362549
>>>>>> },
>>>>>> "timestamp": "2024-04-08T02:06:31.000Z"
>>>>>> }
>>>>>> },
>>>>>> {
>>>>>> "deviceId": "23dc78ffa",
>>>>>> "event": {
>>>>>> "lvl": 3,
>>>>>> "someValue": {
>>>>>> "horizontal": 11.149606704711914,
>>>>>> "vertical": 7.503936767578125
>>>>>> },
>>>>>> "consequence": 2,
>>>>>> "timestamp": "2024-04-08T02:06:49.000Z"
>>>>>> }
>>>>>> }
>>>>>> ],
>>>>>> "metadata": {}
>>>>>> }
>>>>>>
>>>>>> Single message can contain tens or even occasionally hundreds of elements
>>>>>> inside data array. This part is given to us by the external company so it
>>>>>> is hard to do anything with that.
>>>>>>
>>>>>> I need to implement a two-stage validation:
>>>>>>
>>>>>> 1. Validate the root structure (request, response, data, and
>>>>>> metadata
>>>>>> fields)
>>>>>> 2. Validate each element in the data array individually
>>>>>>
>>>>>> When an individual element from the array fails validation:
>>>>>>
>>>>>> - Send only that element to a Dead Letter Topic
>>>>>> - Continue processing other valid elements
>>>>>>
>>>>>> When validation passes:
>>>>>>
>>>>>> - Send each valid element as a separate Kafka record with a new key
>>>>>> (
>>>>>> deviceId) to an output topic
>>>>>>
>>>>>> Now we have two separated, totally not connected json schemas, one for
>>>>>> root
>>>>>> element second for single item from an array and the code looks like
>>>>>> that:
>>>>>>
>>>>>> Serde<NonValidatedDevices> nonValidatedDevicesSerde = ...;
>>>>>> Serde<ValidatedDevice> validatedDeviceSerde = ...;
>>>>>>
>>>>>> streamsBuilder.stream(properties.getEquipment().getRawTopicDevices(),
>>>>>> Consumed.with(Serdes.String(), equipmentSerde))
>>>>>> .flatMap((nullKey, devices) -> devices.getResponse()
>>>>>> .getData()
>>>>>> .getData()
>>>>>> .stream()
>>>>>>
>>>>>> .map(simpleEquipment -> new
>>>>>> KeyValue<>(simpleEquipment.getEquipmentNumber(), simpleEquipment))
>>>>>> .toList())
>>>>>> .repartition(Repartitioned.<String,
>>>>>> NonValidatedDevices>as("non-validated-repartition").withKeySerde(Serdes.String()).withValueSerde(nonValidatedDevicesSerde));
>>>>>>
>>>>>> KTable<String, ValidatedDevice> device = streamsBuilder.table(
>>>>>> String.format("%s-%s-repartition",
>>>>>> properties.getEquipment().getApplicationId(),
>>>>>> "non-validated-repartition"),
>>>>>> Consumed.with(Serdes.String(), validatedDeviceSerde));
>>>>>>
>>>>>> I would like to higlight two things about that code. When we have a one
>>>>>> schema then single not valid element in data array fails whole event
>>>>>> validation which is not acceptable. Second thing is that we don't like to
>>>>>> introduce manual validation in the flatMap method, we would like to use
>>>>>> automatic json schema for that.
>>>>>>
>>>>>> What we would like to improve:
>>>>>> 1. We're repartitioning all messages (including invalid ones), which is
>>>>>> inefficient.
>>>>>> 2. We have a fragile connection between the repartition method and table
>>>>>> method:
>>>>>> Repartitioned.<String,
>>>>>> NonValidatedDevices>as("non-validated-repartition")
>>>>>> String.format("%s-%s-repartition",
>>>>>> properties.getEquipment().getApplicationId(),
>>>>>> "non-validated-repartition")
>>>>>> 3. Kafka Streams internals are unnecessarily exposed in the topic naming
>>>>>> pattern
>>>>>>
>>>>>>
>>>>>> Paweł Szymczyk
>>>>>
>>>>
>>>>
>>>
>>>
>> Paweł Szymczyk
>
--
Pozdrawiam
Paweł Szymczyk