The following link shows an example of how to make a KTable load like a GlobalKTable and fetch the latest record when you have a cyclic relationship.
https://gitlab.com/chad.preisler/kakfainmemorycacheexample You must use the zero timestamp extractor to make this work. I've only used this where the input stream for the KTable is only used by one application. Here is the link to the video where I explain why we did this. Seek to 13:58 in the video. https://www.confluent.io/events/kafka-summit-americas-2021/using-kafka-as-a-database-for-real-time-transaction-processing/ I hope this helps you out. On Fri, Nov 19, 2021 at 5:22 AM Claudia Kesslau <c.kess...@kasasi.de> wrote: > Hi Chad, > > thanks for your input. I thought about just doing the key mappings in the > invalidation topics, too. But I guess there is no way around holding the > primary key values as a list. Although with just the IDs this should not be > a problem. > > Do you have examples how you implemented this stuff with the processor API? > > Best, > Claudia > > > -----Ursprüngliche Nachricht----- > Von: Chad Preisler <chad.preis...@gmail.com> > Gesendet: Donnerstag, 18. November 2021 20:48 > An: users@kafka.apache.org > Betreff: Re: KTable updates by non-key field > > Hi Claudia, > > Looking at your code you have a cyclic relationship. In other words you > are using the same topic for your input and output. If your goal is to > always get the most up to date BoxInfo it will not work correctly. The > KStream to KTable join matches records from the stream with records from > the table that are older. There are some ways to work around this using the > processor API. Let me know if you want more information about that. > > You can do a KTable to KTable join on a foreign key, but that triggers > from both sides so that won't work for this situation. > > When my team needs to do this we either write the same record to two > topics. One with the "primary key" as the topic key and one with the > "foreign key" as the topic key. That will only work in very specific > circumstances. > > We've also created topics that just have the key values (foreign key is > topic key and primary key as value) then did a KStream to KTable join -> > map -> through. The through method has been deprecated and replaced by > repartition. > > I think there is definitely a way to do this using some "index" topics and > the streams API. > > Hope this helps. > Chad > > On Mon, Nov 15, 2021 at 7:06 AM Claudia Kesslau <c.kess...@kasasi.de> > wrote: > > > Hi Kafka community, > > > > I'm new to using KTable functionality in KafkaStreams and I'm > > struggling with updating KTable values by a non-key field. > > > > I've got a stream of BoxData that I want to enrich with BoxInfo objects. > > For those BoxInfo objects I created a compacted topic `boxInfo` and > > read this topic as a KTable. As both the stream of BoxData and the > > table of BoxInfo are already keyed on the same ID joining them works > fine. > > > > Updating BoxInfo objects based on their ID also works fine. I write > > events changing BoxInfo objects by their key to a topic, read this > > topic as a stream, join this stream with the boxInfo KTable and write > > the adapted BoxInfo back to the compacted topic. See this in Code Block > 1. > > From those change events I can only extract the ID of the changed > > BoxInfo object not the changes themselves. Therefore, I just set an > > invalid flag instead of writing the changes directly. I now this is not > ideal. > > > > The actual question now is, how can I do such an invalidation if I > > have to set all BoxInfo objects to invalid matching another field than > > the key itself? > > > > I came up with 3 possible solutions, but none of them seems ideal: > > > > Possibility 1: primitive solution > > For every event indicating changed BoxData by some field, just read > > the whole BoxInfo KTable and mark matching entries. This does not seem > > very smart or scalable. > > > > Possibility 2: KTable solution based on the ideas from above This is > > the same solution as above but with re-keying the BoxInfo KTable > > first. See Code Block 2. > > Storing a list as a store value seems to be quite resource intensive > > and therefore not ideal. > > > > Possibility 3: KStreams solution > > To avoid having a store with key and a list of objects like in > > solution 2, I could read the boxInfo topic as a stream and join with > > the invalidation stream. See Code Block 3. > > This solution won't work this way because with stream-stream-join both > > sides of the join would trigger processing. Is there a way to block > > updates from one side to trigger a join? > > > > Are there any other possibilities that I did not come up with? This > > seems like quite a common scenario but I could not find any solutions > > on the internet and all solutions listed here do not seem good ones. > > > > Thanks for your feedback. > > > > Best, > > Claudia Kesslau > > > > > > > > Code Block 1: > > > > ______________________________________________________________________ > > _________________________________ > > > > KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName...); > > KStream<String, String> invalidationStream = builder.stream(...); > > > > invalidationStream > > .leftJoin(boxInfos, (aVoid, boxInfo) -> boxInfo) > > .filter((boxId, boxInfo) -> nonNull(boxInfo)) > > .mapValues(boxInfo -> boxInfo.setInvalid(true)) > > .to(boxInfoTableName, ...)); > > > > ______________________________________________________________________ > > _________________________________ > > > > > > Code Block 2: > > > > ______________________________________________________________________ > > _________________________________ > > > > KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName, > > ...); KStream<Integer, String> invalidationStream = > > builder.stream(...); > > > > KTable<Integer, List<BoxInfo>> boxInfosByPortalId = boxInfos > > .toStream() > > .flatMap(this::flatMapByPortalId, > > Named.as("Stream-BoxInfosByPortalId")) > > .groupBy((portalId, boxInfo) -> portalId) > > .aggregate(ArrayList::new, (portalId, boxInfo, boxInfoList) -> { > > boxInfoList.add(boxInfo); > > return boxInfoList; > > }, Materialized.with(Serdes.Integer(), new BoxInfoListSerde())); > > > > invalidationStream > > .leftJoin(boxInfosByPortalId, (aVoid, boxInfo) -> boxInfo) > > .flatMap(this::flatMapByBoxId, > Named.as("Stream-BoxInfoById")) > > .mapValues(boxInfo -> boxInfo.setInvalid(true)) > > .to(boxInfoTableName, ...)); > > > > > > List<KeyValue<Integer, BoxInfo>> flatMapByPortalId(String boxId, > > BoxInfo > > boxInfo) { > > if (null == boxInfo) { > > return List.of(); > > } > > > > return boxInfo.getPortals() > > .stream() > > .map(portalId -> KeyValue.pair(portalId, boxInfo)) > > .toList(); > > } > > > > > > List<KeyValue<String, BoxInfo>> flatMapByBoxId(Integer portalId, > > List<BoxInfo> boxInfoList) { > > return boxInfoList.stream() > > .filter(Objects::nonNull) > > .map(boxInfo -> KeyValue.pair(boxInfo.getId(), boxInfo)) > > .toList(); > > } > > > > ______________________________________________________________________ > > _________________________________ > > > > > > Code Block 3: > > > > ______________________________________________________________________ > > _________________________________ > > > > KTable<String, BoxInfo> boxInfos = builder.table(boxInfoTableName, > > ...); KStream<Integer, String> invalidationStream = > > builder.stream(...); > > > > KStream<Integer, BoxInfo> boxInfosByPortalId = boxInfos > > .toStream() > > .flatMap(this::flatMapByPortalId, > > Named.as("Stream-BoxInfosByPortalId")); > > > > invalidationStream > > .leftJoin(boxInfosByPortalId, (aVoid, boxInfo) -> boxInfo, > > JoinWindows.of(ZERO), StreamJoined.with(Serdes.Integer(), > > Serdes.String(), new BoxInfoSerde())) > > .filter((portalId, boxInfo) -> nonNull(boxInfo)) > > .selectKey((portalId, boxInfo) -> boxInfo.getId()) > > .mapValues(boxInfo -> boxInfo.setInvalid(true)) > > .to(boxInfoTableName, ...)); > > > > List<KeyValue<Integer, BoxInfo>> flatMapByPortalId(String boxId, > > BoxInfo > > boxInfo) { > > if (null == boxInfo) { > > return List.of(); > > } > > > > return boxInfo.getPortals() > > .stream() > > .map(portalId -> KeyValue.pair(portalId, boxInfo)) > > .toList(); > > } > > > > ______________________________________________________________________ > > _________________________________ > > >