Hi Damian, thank you for the informative reply. I think this answers 95% of my questions (or maybe 100% and I missed the explanation).
what is still unresolved is how to handle trades and risks that arrive far apart. Suppose we have timeToAllowAJoin = 10 seconds and we have Time | Trade | Risk 0s ---------------------------------------------- 1s Trade(t1, v1) 4s Trade(t2, v1) 5s Trade(t3, v1) 8s Risk(t2, v1) 10s ---------------------------------------------- 14s Risk(t1, v1) 20s ---------------------------------------------- 27s Risk(t4, v1) 30s ---------------------------------------------- 37s Risk(t3, v1) 40s ---------------------------------------------- 47s Trade(t4, v1) 50s ---------------------------------------------- I think trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin)); will join Risk(t2,v1) -> Trade(t2,v1) for window 0-10s efficiently but I don't think I get the other joins, even running trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin)); for windows 10s - 20s 20s - 30s 40s - 40s If this is correct, then is there another common way to handle a scenario like the one above? thanks in advance, Peter On Fri, May 18, 2018 at 6:27 PM, Damian Guy <damian....@gmail.com> wrote: > Hi, > > In order to join the two streams they need to have the same key and the > same number of partitions in each topic. If they don't have the same key > you can force a repartition by using: > > `stream.selectKey(KeyValueMapper)` > > if the number of partitions is also different you could do: > `stream.selectKey(KeyValueMapper).through("your-new-topic")` > > You would need to create "your-new-topic" in advance with the correct > number of partitions. > > Now assuming that we have the same key and the same number of partitions, > the join is something like: > > `trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin));` > > Because the trade and risk have the same key when a trade or risk event > arrives you will only join against the corresponding event (within the time > window specified in the join). For example: > > Trade <t1, v1> > Trade <t2, v1> > Risk <t1, v1> -> join(Trade <t1,v1>) > Risk<t2, v1> -> join(Trade <t2, v1>) > > Note: if multiple events for the same key arrive within the same JoinWindow > you will get multiple outputs. However, you could avoid this from going > downstream by using `transformValues(..)` after the join. You would attach > a StateStore to the `transformValues`, i.e., by first creating the store > and then passing in the store name as a param to the method. Then when a > join result for a given key arrives, your transformer would first check in > the store if there was already a result, if there isn't a result update the > store and send the result downstream. If there is a result you drop it. > > Regards, > Damian > > > > - > > On Fri, 18 May 2018 at 22:57 Peter Kleinmann <nnamni...@gmail.com> wrote: > > > Dear community, sorry in advance for what will be a newbie question: > > > > > > suppose I have two topics > > trades > > risks > > > > and I want to join a trade in the trades topic to a risk message in the > > risks topic by fields tradeId, and version, which exist in both trade and > > risk messages. > > > > Seems I can naturally create streams on top of each topic, but here is > the > > question: > > > > Suppose in one period between time boundary b0 and b1 trades t1 and t2 > > arrive, and risk r1 matching t1 arrives. > > > > In the next period, risk r2 arrives matching t2. > > > > a) How do I join r2 to t2? > > > > b) How do I not reprocess t1 and r1? > > > > I'm going to have between 2 million and 25 million trades and risks a > day, > > so once a trade and risk has been matched, I dont want to handle them > > again. > > > > Do I need to sink the kafka topics to something like postgres, and have a > > umatched trades table > > unmatched risks table > > matched table > > > > Many Many Thanks in Advance!!! > > >