Dear Kafka users, The last few days I spent working with Kafka Streams on some tasks which looked very easy at first glance but finally I struggled with the Streams Builder API and did something which I am not proud of. Please help me, I am open to any suggestions. On the input topic we have a message where inside field data we have an array of particular devices received from 3party company, for example: { "request": {}, "response": {}, "data": [{ "deviceId": "23dc78ffa6c1ad3c038b", "event": { "lvl": 2, "someValue": { "horizontal": 9.448819160461426, "vertical": 1.4566929340362549 }, "timestamp": "2024-04-08T02:06:31.000Z" } }, { "deviceId": "23dc78ffa", "event": { "lvl": 3, "someValue": { "horizontal": 11.149606704711914, "vertical": 7.503936767578125 }, "consequence": 2, "timestamp": "2024-04-08T02:06:49.000Z" } } ], "metadata": {} }
Single message can contain tens or even occasionally hundreds of elements inside data array. This part is given to us by the external company so it is hard to do anything with that. I need to implement a two-stage validation: 1. Validate the root structure (request, response, data, and metadata fields) 2. Validate each element in the data array individually When an individual element from the array fails validation: - Send only that element to a Dead Letter Topic - Continue processing other valid elements When validation passes: - Send each valid element as a separate Kafka record with a new key ( deviceId) to an output topic Now we have two separated, totally not connected json schemas, one for root element second for single item from an array and the code looks like that: Serde<NonValidatedDevices> nonValidatedDevicesSerde = ...; Serde<ValidatedDevice> validatedDeviceSerde = ...; streamsBuilder.stream(properties.getEquipment().getRawTopicDevices(), Consumed.with(Serdes.String(), equipmentSerde)) .flatMap((nullKey, devices) -> devices.getResponse() .getData() .getData() .stream() .map(simpleEquipment -> new KeyValue<>(simpleEquipment.getEquipmentNumber(), simpleEquipment)) .toList()) .repartition(Repartitioned.<String, NonValidatedDevices>as("non-validated-repartition").withKeySerde(Serdes.String()).withValueSerde(nonValidatedDevicesSerde)); KTable<String, ValidatedDevice> device = streamsBuilder.table( String.format("%s-%s-repartition", properties.getEquipment().getApplicationId(), "non-validated-repartition"), Consumed.with(Serdes.String(), validatedDeviceSerde)); I would like to higlight two things about that code. When we have a one schema then single not valid element in data array fails whole event validation which is not acceptable. Second thing is that we don't like to introduce manual validation in the flatMap method, we would like to use automatic json schema for that. What we would like to improve: 1. We're repartitioning all messages (including invalid ones), which is inefficient. 2. We have a fragile connection between the repartition method and table method: Repartitioned.<String, NonValidatedDevices>as("non-validated-repartition") String.format("%s-%s-repartition", properties.getEquipment().getApplicationId(), "non-validated-repartition") 3. Kafka Streams internals are unnecessarily exposed in the topic naming pattern