I'm designing a Streams application that provides an API that acts
on messages.  Messages have a sender.

I have a KStream<Id, Message> and a KTable<SenderId, Sender>
The first time a message is sent, you need to ensure the sender
exists beforehand.  Roughly,

void send(Message m) {
    if (senderTable.get(m.getSenderId())) {
        senderTopic.put(senderId, createSender(m));
    }
    messageTopic.produce(m.getId(), m);
}

Unfortunately, there is no ordering guarantee between these topics --
it is entirely possible for message processing to happen before the sender
is created in the KTable.

I'm trying to work around this, by essentially keeping a 
CompletableFuture<Sender>
and doing:

senderTable.foreach((senderId, sender) -> 
awaiters.get(senderId).complete(sender)));

void send(Message m) {
    if (senderTable.get(m.getSenderId()) == null) {
        senderTopic.put(senderId, createSender(m));
        awaiters.put(id, sender -> realSend(sender, m));
    } else {
        realSend(sender, m);
    }
}

void realSend(MessageSender s, Message m) {
    messageTopic.produce(m.getId(), m);
}

Unfortunately, it seems that the KTable is wired up in a way that makes this
nearly impossible to implement well -- it unconditionally enables a
CachingKeyValueStore, which seems to only actually emit updates (and trigger 
foreach)
in batch once per commit interval.  This is terrible for the real-time behavior 
I expect.

Clearly I'm thinking about this problem wrong, but I'm not really sure what the 
most effective
route is to fix my design.  If the KTable was lower latency (disable caching?) 
the window of
sadness gets much much smaller -- but fundamentally it seems very difficult to 
track down
given a produced message M at what point you can guarantee that processor P has 
observed message M
to enforce any sort of ordering guarantees.

Anybody else wrestling with problems like this and have thoughts?  Thanks in 
advance.

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to