Hi Damian,

thank you for the informative reply.    I think this answers 95% of my
questions (or maybe 100% and I missed the explanation).

what is still unresolved is how to handle trades and risks that arrive far
apart.

Suppose we have

timeToAllowAJoin = 10  seconds

and we have

Time | Trade         | Risk
0s  ----------------------------------------------

1s     Trade(t1, v1)
4s     Trade(t2, v1)
5s     Trade(t3, v1)
8s                     Risk(t2, v1)
10s ----------------------------------------------
14s                    Risk(t1, v1)
20s ----------------------------------------------
27s                    Risk(t4, v1)
30s ----------------------------------------------
37s                    Risk(t3, v1)
40s ----------------------------------------------
47s    Trade(t4, v1)
50s ----------------------------------------------


I think
trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin));

will join
Risk(t2,v1) -> Trade(t2,v1)
for window 0-10s efficiently

but I don't think I get the other joins, even running
trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin));
for windows
10s - 20s
20s - 30s
40s - 40s
If this is correct, then is there another common way to handle a scenario
like the one above?

thanks in advance,

Peter







On Fri, May 18, 2018 at 6:27 PM, Damian Guy <damian....@gmail.com> wrote:

> Hi,
>
> In order to join the two streams they need to have the same key and the
> same number of partitions in each topic. If they don't have the same key
> you can force a repartition by using:
>
> `stream.selectKey(KeyValueMapper)`
>
> if the number of partitions is also different you could do:
> `stream.selectKey(KeyValueMapper).through("your-new-topic")`
>
> You would need to create "your-new-topic" in advance with the correct
> number of partitions.
>
> Now assuming that we have the same key and the same number of partitions,
> the join is something like:
>
> `trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin));`
>
> Because the trade and risk have the same key when a trade or risk event
> arrives you will only join against the corresponding event (within the time
> window specified in the join). For example:
>
> Trade <t1, v1>
> Trade <t2, v1>
> Risk <t1, v1>  -> join(Trade <t1,v1>)
> Risk<t2, v1> -> join(Trade <t2, v1>)
>
> Note: if multiple events for the same key arrive within the same JoinWindow
> you will get multiple outputs. However, you could avoid this from going
> downstream by using `transformValues(..)` after the join. You would attach
> a StateStore to the `transformValues`, i.e., by first creating the store
> and then passing in the store name as a param to the method. Then when a
> join result for a given key arrives, your transformer would first check in
> the store if there was already a result, if there isn't a result update the
> store and send the result downstream. If there is a result you drop it.
>
> Regards,
> Damian
>
>
>
>  -
>
> On Fri, 18 May 2018 at 22:57 Peter Kleinmann <nnamni...@gmail.com> wrote:
>
> > Dear community, sorry in advance for what will be a newbie question:
> >
> >
> > suppose I have two topics
> > trades
> > risks
> >
> > and I want to join a trade in the trades topic to a risk message in the
> > risks topic by fields tradeId, and version, which exist in both trade and
> > risk messages.
> >
> > Seems I can naturally create streams on top of each topic, but here is
> the
> > question:
> >
> > Suppose in one period between time boundary b0 and b1 trades t1 and t2
> > arrive, and risk r1 matching t1 arrives.
> >
> > In the next period, risk r2 arrives matching t2.
> >
> > a) How do I join r2 to t2?
> >
> > b) How do I not reprocess t1 and r1?
> >
> > I'm going to have between 2 million and 25 million trades and risks a
> day,
> > so once a trade and risk has been matched, I dont want to handle them
> > again.
> >
> > Do I need to sink the kafka topics to something like postgres, and have a
> > umatched trades table
> > unmatched risks table
> > matched table
> >
> > Many Many Thanks in Advance!!!
> >
>

Reply via email to