Messages that don't find a join partner are dropped. For each incoming message, we do the following: 1. insert it into it's window store 2. lookup other window store for matching record a) if matching records are found, compute join and emit
Note, that we maintain all records in the window store until retention time passes. Thus, if there will be no matching join record, a record will eventually be dropped. There is no API to be notified about this. With regard to left/outer join: your observation is correct -- we need to implement it this way, as it's unclear for how long to delay the computation and to wait for a matching record -- note, that Streams is able to handle late data, thus, the only "save" way to avoid a "double" call would be to change the implementation to wait until retention time is over (default is 24h) -- this implies a way to high latency and also result in out-of-order results. To tackle this issue, you could implement a "de-duplication" operator that consumes the join output stream. This stateful `Transfomer` could buffer all "early" (msg,null) and (null,msg) record for some time to see if there will be a "proper" join result later. Using punctuation you can emit (msg,null)/(null,msg) join result if you think(!) there will be not "proper" join result anymore. Note, that there always might be a late join result, and thus, this approach has it's own issues (of course, you could drop late "proper" join result in case you did emit a (msg,null)/(null,msg) already. Hope this helps. -Matthias On 11/9/17 2:53 PM, Thaler, Michael wrote: > Hi all, > > So let's say I have 2 topics coming that I want to join using KStream#join. I > set them up like so: > > KStreamBuilder builder = new KStreamBuilder(); > KStream<String, String> a = builder.stream(TOPIC_A); > KStream<String, String> b = builder.stream(TOPIC_B); > > a.join(b, (msgA, msgB) -> msgA + msgB, > JoinWindows.of(TimeUnit.HOURS.inMillis(1)) > .print(); > > So this works fine and joins the messages together. But what happens to > messages that don't find a join partner in the other topic within the window? > If I get a message in topic A and its partner doesn't occur in B, when and > how does the message get consumed? Is there a way to write my application so > that this is caught somehow and handled? > > I'm aware that I could use a leftJoin instead, but that would call the merge > function twice, once with (msgA, null) and the second time with (msgA, msgB). > I'm trying to find a solution that only calls one or the other. > > Is there a way to do this cleanly? > > Thanks! > --Michael Thaler >
signature.asc
Description: OpenPGP digital signature