Dear Kafka users,

The last few days I spent working with Kafka Streams on some tasks which
looked very easy at first glance but finally I struggled with the Streams
Builder API and did something which I am not proud of. Please help me, I am
open to any suggestions.
On the input topic we have a message where inside field data we have an
array of particular devices received from 3party company, for example:
{
    "request": {},
    "response": {},
    "data": [{
            "deviceId": "23dc78ffa6c1ad3c038b",
            "event": {
                "lvl": 2,
                "someValue": {
                    "horizontal": 9.448819160461426,
                    "vertical": 1.4566929340362549
                },
                "timestamp": "2024-04-08T02:06:31.000Z"
            }
        },
        {
            "deviceId": "23dc78ffa",
            "event": {
                "lvl": 3,
                "someValue": {
                    "horizontal": 11.149606704711914,
                    "vertical": 7.503936767578125
                },
                "consequence": 2,
                "timestamp": "2024-04-08T02:06:49.000Z"
            }
        }
    ],
    "metadata": {}
}

Single message can contain tens or even occasionally hundreds of elements
inside data array. This part is given to us by the external company so it
is hard to do anything with that.

I need to implement a two-stage validation:

   1. Validate the root structure (request, response, data, and metadata
   fields)
   2. Validate each element in the data array individually

When an individual element from the array fails validation:

   - Send only that element to a Dead Letter Topic
   - Continue processing other valid elements

When validation passes:

   - Send each valid element as a separate Kafka record with a new key (
   deviceId) to an output topic

Now we have two separated, totally not connected json schemas, one for root
element second for single item from an array and the code looks like that:

Serde<NonValidatedDevices> nonValidatedDevicesSerde = ...;
Serde<ValidatedDevice> validatedDeviceSerde = ...;

streamsBuilder.stream(properties.getEquipment().getRawTopicDevices(),
Consumed.with(Serdes.String(), equipmentSerde))
              .flatMap((nullKey, devices) -> devices.getResponse()
                                                          .getData()
                                                          .getData()
                                                          .stream()

.map(simpleEquipment -> new
KeyValue<>(simpleEquipment.getEquipmentNumber(), simpleEquipment))
                                                          .toList())
              .repartition(Repartitioned.<String,
NonValidatedDevices>as("non-validated-repartition").withKeySerde(Serdes.String()).withValueSerde(nonValidatedDevicesSerde));

KTable<String, ValidatedDevice> device = streamsBuilder.table(
        String.format("%s-%s-repartition",
properties.getEquipment().getApplicationId(), "non-validated-repartition"),
        Consumed.with(Serdes.String(), validatedDeviceSerde));

I would like to higlight two things about that code. When we have a one
schema then single not valid element in data array fails whole event
validation which is not acceptable. Second thing is that we don't like to
introduce manual validation in the flatMap method, we would like to use
automatic json schema for that.

What we would like to improve:
1. We're repartitioning all messages (including invalid ones), which is
inefficient.
2. We have a fragile connection between the repartition method and table
method:
Repartitioned.<String, NonValidatedDevices>as("non-validated-repartition")
String.format("%s-%s-repartition",
properties.getEquipment().getApplicationId(), "non-validated-repartition")
3. Kafka Streams internals are unnecessarily exposed in the topic naming
pattern

Reply via email to