I'm designing a Streams application that provides an API that acts on messages. Messages have a sender.
I have a KStream<Id, Message> and a KTable<SenderId, Sender> The first time a message is sent, you need to ensure the sender exists beforehand. Roughly, void send(Message m) { if (senderTable.get(m.getSenderId())) { senderTopic.put(senderId, createSender(m)); } messageTopic.produce(m.getId(), m); } Unfortunately, there is no ordering guarantee between these topics -- it is entirely possible for message processing to happen before the sender is created in the KTable. I'm trying to work around this, by essentially keeping a CompletableFuture<Sender> and doing: senderTable.foreach((senderId, sender) -> awaiters.get(senderId).complete(sender))); void send(Message m) { if (senderTable.get(m.getSenderId()) == null) { senderTopic.put(senderId, createSender(m)); awaiters.put(id, sender -> realSend(sender, m)); } else { realSend(sender, m); } } void realSend(MessageSender s, Message m) { messageTopic.produce(m.getId(), m); } Unfortunately, it seems that the KTable is wired up in a way that makes this nearly impossible to implement well -- it unconditionally enables a CachingKeyValueStore, which seems to only actually emit updates (and trigger foreach) in batch once per commit interval. This is terrible for the real-time behavior I expect. Clearly I'm thinking about this problem wrong, but I'm not really sure what the most effective route is to fix my design. If the KTable was lower latency (disable caching?) the window of sadness gets much much smaller -- but fundamentally it seems very difficult to track down given a produced message M at what point you can guarantee that processor P has observed message M to enforce any sort of ordering guarantees. Anybody else wrestling with problems like this and have thoughts? Thanks in advance.
signature.asc
Description: Message signed with OpenPGP using GPGMail