Hi Kafka community,

I'm new to using KTable functionality in KafkaStreams and I'm struggling with 
updating KTable values by a non-key field.

I've got a stream of BoxData that I want to enrich with BoxInfo objects. For 
those BoxInfo objects I created a compacted topic `boxInfo` and read this topic 
as a KTable. As both the stream of BoxData and the table of BoxInfo are already 
keyed on the same ID joining them works fine.

Updating BoxInfo objects based on their ID also works fine. I write events 
changing BoxInfo objects by their key to a topic, read this topic as a stream, 
join this stream with the boxInfo KTable and write the adapted BoxInfo back to 
the compacted topic. See this in Code Block 1.
From those change events I can only extract the ID of the changed BoxInfo 
object not the changes themselves. Therefore, I just set an invalid flag 
instead of writing the changes directly. I now this is not ideal. 

The actual question now is, how can I do such an invalidation if I have to set 
all BoxInfo objects to invalid matching another field than the key itself? 

I came up with 3 possible solutions, but none of them seems ideal:

Possibility 1: primitive solution
For every event indicating changed BoxData by some field, just read the whole 
BoxInfo KTable and mark matching entries. This does not seem very smart or 
scalable.

Possibility 2: KTable solution based on the ideas from above
This is the same solution as above but with re-keying the BoxInfo KTable first. 
See Code Block 2.
Storing a list as a store value seems to be quite resource intensive and 
therefore not ideal. 

Possibility 3: KStreams solution
To avoid having a store with key and a list of objects like in solution 2, I 
could read the boxInfo topic as a stream and join with the invalidation stream. 
See Code Block 3.
This solution won't work this way because with stream-stream-join both sides of 
the join would trigger processing. Is there a way to block updates from one 
side to trigger a join?

Are there any other possibilities that I did not come up with? This seems like 
quite a common scenario but I could not find any solutions on the internet and 
all solutions listed here do not seem good ones.

Thanks for your feedback.

Best,
Claudia Kesslau



Code Block 1:
_______________________________________________________________________________________________________

KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName...);
KStream<String, String> invalidationStream = builder.stream(...);

invalidationStream
            .leftJoin(boxInfos, (aVoid, boxInfo) -> boxInfo)
            .filter((boxId, boxInfo) -> nonNull(boxInfo))
            .mapValues(boxInfo -> boxInfo.setInvalid(true)) 
            .to(boxInfoTableName, ...));
_______________________________________________________________________________________________________


Code Block 2:
_______________________________________________________________________________________________________

KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName, ...);
KStream<Integer, String> invalidationStream = builder.stream(...);

KTable<Integer, List<BoxInfo>> boxInfosByPortalId = boxInfos
    .toStream()
    .flatMap(this::flatMapByPortalId, Named.as("Stream-BoxInfosByPortalId"))
    .groupBy((portalId, boxInfo) -> portalId)
    .aggregate(ArrayList::new, (portalId, boxInfo, boxInfoList) -> {
        boxInfoList.add(boxInfo);
        return boxInfoList;
    }, Materialized.with(Serdes.Integer(), new BoxInfoListSerde()));

invalidationStream
            .leftJoin(boxInfosByPortalId, (aVoid, boxInfo) -> boxInfo)
            .flatMap(this::flatMapByBoxId, Named.as("Stream-BoxInfoById"))
            .mapValues(boxInfo -> boxInfo.setInvalid(true)) 
            .to(boxInfoTableName, ...));


List<KeyValue<Integer, BoxInfo>> flatMapByPortalId(String boxId, BoxInfo 
boxInfo) {
    if (null == boxInfo) {
        return List.of();
    }

    return boxInfo.getPortals()
        .stream()
        .map(portalId -> KeyValue.pair(portalId, boxInfo))
        .toList();
}


List<KeyValue<String, BoxInfo>> flatMapByBoxId(Integer portalId, List<BoxInfo> 
boxInfoList) {
    return boxInfoList.stream()
        .filter(Objects::nonNull)
        .map(boxInfo -> KeyValue.pair(boxInfo.getId(), boxInfo))
        .toList();
}
_______________________________________________________________________________________________________


Code Block 3:
_______________________________________________________________________________________________________

KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName, ...);
KStream<Integer, String> invalidationStream = builder.stream(...);

KStream<Integer, BoxInfo> boxInfosByPortalId = boxInfos
    .toStream()
    .flatMap(this::flatMapByPortalId, Named.as("Stream-BoxInfosByPortalId"));

invalidationStream
            .leftJoin(boxInfosByPortalId, (aVoid, boxInfo) -> boxInfo, 
JoinWindows.of(ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), new 
BoxInfoSerde()))
            .filter((portalId, boxInfo) -> nonNull(boxInfo))
            .selectKey((portalId, boxInfo) -> boxInfo.getId())
            .mapValues(boxInfo -> boxInfo.setInvalid(true)) 
            .to(boxInfoTableName, ...));

List<KeyValue<Integer, BoxInfo>> flatMapByPortalId(String boxId, BoxInfo 
boxInfo) {
    if (null == boxInfo) {
        return List.of();
    }

    return boxInfo.getPortals()
        .stream()
        .map(portalId -> KeyValue.pair(portalId, boxInfo))
        .toList();
}
_______________________________________________________________________________________________________

Reply via email to