I had a similar use case of joining two streams with windows spanning days.
That didn't work out well.

For you, this approach might work better:
1. Stream Trades and put it in a Key/Value store like (Aerospike).
2. Stream Risks and in the map function you can join it with key saved in
Aerospike.

Aerospike record ttl can help you with window size and also, additionally
you can keep the event timestamp within the record.


On Sun, May 20, 2018 at 12:20 AM Peter Kleinmann <nnamni...@gmail.com>
wrote:

> Hi Damian,
>
> thank you for the informative reply.    I think this answers 95% of my
> questions (or maybe 100% and I missed the explanation).
>
> what is still unresolved is how to handle trades and risks that arrive far
> apart.
>
> Suppose we have
>
> timeToAllowAJoin = 10  seconds
>
> and we have
>
> Time | Trade         | Risk
> 0s  ----------------------------------------------
>
> 1s     Trade(t1, v1)
> 4s     Trade(t2, v1)
> 5s     Trade(t3, v1)
> 8s                     Risk(t2, v1)
> 10s ----------------------------------------------
> 14s                    Risk(t1, v1)
> 20s ----------------------------------------------
> 27s                    Risk(t4, v1)
> 30s ----------------------------------------------
> 37s                    Risk(t3, v1)
> 40s ----------------------------------------------
> 47s    Trade(t4, v1)
> 50s ----------------------------------------------
>
>
> I think
> trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin));
>
> will join
> Risk(t2,v1) -> Trade(t2,v1)
> for window 0-10s efficiently
>
> but I don't think I get the other joins, even running
> trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin));
> for windows
> 10s - 20s
> 20s - 30s
> 40s - 40s
> If this is correct, then is there another common way to handle a scenario
> like the one above?
>
> thanks in advance,
>
> Peter
>
>
>
>
>
>
>
> On Fri, May 18, 2018 at 6:27 PM, Damian Guy <damian....@gmail.com> wrote:
>
> > Hi,
> >
> > In order to join the two streams they need to have the same key and the
> > same number of partitions in each topic. If they don't have the same key
> > you can force a repartition by using:
> >
> > `stream.selectKey(KeyValueMapper)`
> >
> > if the number of partitions is also different you could do:
> > `stream.selectKey(KeyValueMapper).through("your-new-topic")`
> >
> > You would need to create "your-new-topic" in advance with the correct
> > number of partitions.
> >
> > Now assuming that we have the same key and the same number of partitions,
> > the join is something like:
> >
> > `trades.join(risk, valueJoiner, JoinWindows.of(timeToAllowAJoin));`
> >
> > Because the trade and risk have the same key when a trade or risk event
> > arrives you will only join against the corresponding event (within the
> time
> > window specified in the join). For example:
> >
> > Trade <t1, v1>
> > Trade <t2, v1>
> > Risk <t1, v1>  -> join(Trade <t1,v1>)
> > Risk<t2, v1> -> join(Trade <t2, v1>)
> >
> > Note: if multiple events for the same key arrive within the same
> JoinWindow
> > you will get multiple outputs. However, you could avoid this from going
> > downstream by using `transformValues(..)` after the join. You would
> attach
> > a StateStore to the `transformValues`, i.e., by first creating the store
> > and then passing in the store name as a param to the method. Then when a
> > join result for a given key arrives, your transformer would first check
> in
> > the store if there was already a result, if there isn't a result update
> the
> > store and send the result downstream. If there is a result you drop it.
> >
> > Regards,
> > Damian
> >
> >
> >
> >  -
> >
> > On Fri, 18 May 2018 at 22:57 Peter Kleinmann <nnamni...@gmail.com>
> wrote:
> >
> > > Dear community, sorry in advance for what will be a newbie question:
> > >
> > >
> > > suppose I have two topics
> > > trades
> > > risks
> > >
> > > and I want to join a trade in the trades topic to a risk message in the
> > > risks topic by fields tradeId, and version, which exist in both trade
> and
> > > risk messages.
> > >
> > > Seems I can naturally create streams on top of each topic, but here is
> > the
> > > question:
> > >
> > > Suppose in one period between time boundary b0 and b1 trades t1 and t2
> > > arrive, and risk r1 matching t1 arrives.
> > >
> > > In the next period, risk r2 arrives matching t2.
> > >
> > > a) How do I join r2 to t2?
> > >
> > > b) How do I not reprocess t1 and r1?
> > >
> > > I'm going to have between 2 million and 25 million trades and risks a
> > day,
> > > so once a trade and risk has been matched, I dont want to handle them
> > > again.
> > >
> > > Do I need to sink the kafka topics to something like postgres, and
> have a
> > > umatched trades table
> > > unmatched risks table
> > > matched table
> > >
> > > Many Many Thanks in Advance!!!
> > >
> >
>

Reply via email to