Hi Kafka community, I'm new to using KTable functionality in KafkaStreams and I'm struggling with updating KTable values by a non-key field.
I've got a stream of BoxData that I want to enrich with BoxInfo objects. For those BoxInfo objects I created a compacted topic `boxInfo` and read this topic as a KTable. As both the stream of BoxData and the table of BoxInfo are already keyed on the same ID joining them works fine. Updating BoxInfo objects based on their ID also works fine. I write events changing BoxInfo objects by their key to a topic, read this topic as a stream, join this stream with the boxInfo KTable and write the adapted BoxInfo back to the compacted topic. See this in Code Block 1. From those change events I can only extract the ID of the changed BoxInfo object not the changes themselves. Therefore, I just set an invalid flag instead of writing the changes directly. I now this is not ideal. The actual question now is, how can I do such an invalidation if I have to set all BoxInfo objects to invalid matching another field than the key itself? I came up with 3 possible solutions, but none of them seems ideal: Possibility 1: primitive solution For every event indicating changed BoxData by some field, just read the whole BoxInfo KTable and mark matching entries. This does not seem very smart or scalable. Possibility 2: KTable solution based on the ideas from above This is the same solution as above but with re-keying the BoxInfo KTable first. See Code Block 2. Storing a list as a store value seems to be quite resource intensive and therefore not ideal. Possibility 3: KStreams solution To avoid having a store with key and a list of objects like in solution 2, I could read the boxInfo topic as a stream and join with the invalidation stream. See Code Block 3. This solution won't work this way because with stream-stream-join both sides of the join would trigger processing. Is there a way to block updates from one side to trigger a join? Are there any other possibilities that I did not come up with? This seems like quite a common scenario but I could not find any solutions on the internet and all solutions listed here do not seem good ones. Thanks for your feedback. Best, Claudia Kesslau Code Block 1: _______________________________________________________________________________________________________ KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName...); KStream<String, String> invalidationStream = builder.stream(...); invalidationStream .leftJoin(boxInfos, (aVoid, boxInfo) -> boxInfo) .filter((boxId, boxInfo) -> nonNull(boxInfo)) .mapValues(boxInfo -> boxInfo.setInvalid(true)) .to(boxInfoTableName, ...)); _______________________________________________________________________________________________________ Code Block 2: _______________________________________________________________________________________________________ KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName, ...); KStream<Integer, String> invalidationStream = builder.stream(...); KTable<Integer, List<BoxInfo>> boxInfosByPortalId = boxInfos .toStream() .flatMap(this::flatMapByPortalId, Named.as("Stream-BoxInfosByPortalId")) .groupBy((portalId, boxInfo) -> portalId) .aggregate(ArrayList::new, (portalId, boxInfo, boxInfoList) -> { boxInfoList.add(boxInfo); return boxInfoList; }, Materialized.with(Serdes.Integer(), new BoxInfoListSerde())); invalidationStream .leftJoin(boxInfosByPortalId, (aVoid, boxInfo) -> boxInfo) .flatMap(this::flatMapByBoxId, Named.as("Stream-BoxInfoById")) .mapValues(boxInfo -> boxInfo.setInvalid(true)) .to(boxInfoTableName, ...)); List<KeyValue<Integer, BoxInfo>> flatMapByPortalId(String boxId, BoxInfo boxInfo) { if (null == boxInfo) { return List.of(); } return boxInfo.getPortals() .stream() .map(portalId -> KeyValue.pair(portalId, boxInfo)) .toList(); } List<KeyValue<String, BoxInfo>> flatMapByBoxId(Integer portalId, List<BoxInfo> boxInfoList) { return boxInfoList.stream() .filter(Objects::nonNull) .map(boxInfo -> KeyValue.pair(boxInfo.getId(), boxInfo)) .toList(); } _______________________________________________________________________________________________________ Code Block 3: _______________________________________________________________________________________________________ KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName, ...); KStream<Integer, String> invalidationStream = builder.stream(...); KStream<Integer, BoxInfo> boxInfosByPortalId = boxInfos .toStream() .flatMap(this::flatMapByPortalId, Named.as("Stream-BoxInfosByPortalId")); invalidationStream .leftJoin(boxInfosByPortalId, (aVoid, boxInfo) -> boxInfo, JoinWindows.of(ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), new BoxInfoSerde())) .filter((portalId, boxInfo) -> nonNull(boxInfo)) .selectKey((portalId, boxInfo) -> boxInfo.getId()) .mapValues(boxInfo -> boxInfo.setInvalid(true)) .to(boxInfoTableName, ...)); List<KeyValue<Integer, BoxInfo>> flatMapByPortalId(String boxId, BoxInfo boxInfo) { if (null == boxInfo) { return List.of(); } return boxInfo.getPortals() .stream() .map(portalId -> KeyValue.pair(portalId, boxInfo)) .toList(); } _______________________________________________________________________________________________________