Cogroup is nice, thanks. But if I define a tumbling window of one day, does that mean flink needs to cache all the data for one day in memory? I have about 5TB of data coming for one day. About 50% records will find a matching records (the other 50% doesn't).
On Thu, Apr 14, 2016 at 9:05 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > right now, Flink does not give you a way to get the the records that where > not joined for a join. You can, however use a co-group operation instead of > a join to figure out which records did not join with records from the other > side and treat them separately. > > Let me show an example: > > val input1: DataStream[A] = ... > val input2: DataStream[B] = ... > > val result = input1.coGroup(input2) > .where(_.key1) > .equalTo(_.key2) > .window(TumblingTimeWindows.of(Time.days(1))) > .apply(new MyCoGroupFunction) > > class MyCoGroupFunction { > void coGroup(Iterable[A] first, Iterable[B] second, Collector[O] out) { > if (!first.iterator().hasNext()) { > // no element from first input matched > out.collect(<message telling that I only have second input elements>) > } else if (!second.iterator().hasNext()) { > out.collect(<message telling that I only have first input > elements>) > } else { > // perform the actual join using the two iterables > } > } > } > > The result will be a stream that contains both join results as well as the > elements telling you that something didn't join. You can process this > stream further by splitting it into different streams of only proper join > results and non-joined elements and so on. > > I hope this helps somewhat. > > Cheers, > Aljoscha > On Thu, 14 Apr 2016 at 08:55 Balaji Rajagopalan < > balaji.rajagopa...@olacabs.com> wrote: > >> Let me give you specific example, say stream1 event1 happened within your >> window 0-5 min with key1, and event2 on stream2 with key2 which could have >> matched with key1 happened at 5:01 outside the join window, so now you will >> have to co-relate the event2 on stream2 with the event1 with stream1 which >> has happened on the previous window, this was the corner case I mentioned >> before. I am not aware if flink can solve this problem for you, that would >> be nice, instead of solving this in application. >> >> On Thu, Apr 14, 2016 at 12:10 PM, Henry Cai <h...@pinterest.com> wrote: >> >>> Thanks Balaji. Do you mean you spill the non-matching records after 5 >>> minutes into redis? Does flink give you control on which records is not >>> matching in the current window such that you can copy into a long-term >>> storage? >>> >>> >>> >>> On Wed, Apr 13, 2016 at 11:20 PM, Balaji Rajagopalan < >>> balaji.rajagopa...@olacabs.com> wrote: >>> >>>> You can implement join in flink (which is a inner join) the below >>>> mentioned pseudo code . The below join is for a 5 minute interval, yes will >>>> be some corners cases when the data coming after 5 minutes will be missed >>>> out in the join window, I actually had solved this problem but storing some >>>> data in redis and wrote correlation logic to take care of the corner cases >>>> that were missed out in the join window. >>>> >>>> val output: DataStream[(OutputData)] = >>>> stream1.join(stream2).where(_.key1).equalTo(_.key2). >>>> window(TumblingEventTimeWindows.of(Time.of(5, >>>> TimeUnit.MINUTE))).apply(new SomeJoinFunction) >>>> >>>> >>>> On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai <h...@pinterest.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> We are evaluating different streaming platforms. For a typical join >>>>> between two streams >>>>> >>>>> select a.*, b.* >>>>> FROM a, b >>>>> ON a.id == b.id >>>>> >>>>> How does flink implement the join? The matching record from either >>>>> stream can come late, we consider it's a valid join as long as the event >>>>> time for record a and b are in the same day. >>>>> >>>>> I think some streaming platform (e.g. google data flow) will store the >>>>> records from both streams in a K/V lookup store and later do the lookup. >>>>> Is this how flink implement the streaming join? >>>>> >>>>> If we need to store all the records in a state store, that's going to >>>>> be a lots of records for a day. >>>>> >>>>> >>>> >>> >>