I'm designing a Streams application that provides an API that acts on messages. Messages have a sender.
I have a KStream<Id, Message> and a KTable<SenderId, Sender>
The first time a message is sent, you need to ensure the sender
exists beforehand. Roughly,
void send(Message m) {
if (senderTable.get(m.getSenderId())) {
senderTopic.put(senderId, createSender(m));
}
messageTopic.produce(m.getId(), m);
}
Unfortunately, there is no ordering guarantee between these topics --
it is entirely possible for message processing to happen before the sender
is created in the KTable.
I'm trying to work around this, by essentially keeping a
CompletableFuture<Sender>
and doing:
senderTable.foreach((senderId, sender) ->
awaiters.get(senderId).complete(sender)));
void send(Message m) {
if (senderTable.get(m.getSenderId()) == null) {
senderTopic.put(senderId, createSender(m));
awaiters.put(id, sender -> realSend(sender, m));
} else {
realSend(sender, m);
}
}
void realSend(MessageSender s, Message m) {
messageTopic.produce(m.getId(), m);
}
Unfortunately, it seems that the KTable is wired up in a way that makes this
nearly impossible to implement well -- it unconditionally enables a
CachingKeyValueStore, which seems to only actually emit updates (and trigger
foreach)
in batch once per commit interval. This is terrible for the real-time behavior
I expect.
Clearly I'm thinking about this problem wrong, but I'm not really sure what the
most effective
route is to fix my design. If the KTable was lower latency (disable caching?)
the window of
sadness gets much much smaller -- but fundamentally it seems very difficult to
track down
given a produced message M at what point you can guarantee that processor P has
observed message M
to enforce any sort of ordering guarantees.
Anybody else wrestling with problems like this and have thoughts? Thanks in
advance.
signature.asc
Description: Message signed with OpenPGP using GPGMail
