Hi Henry, yes, with early firings you would have the problem of duplicate emission. I'm afraid I don't have a solution for that right now.
For the "another question" I think you are right that this would be session windowing. Please have a look at this blog post that I wrote recently: http://data-artisans.com/session-windowing-in-flink/. And please get back to us if you have more questions or feedback. Cheers, Aljoscha On Fri, 29 Apr 2016 at 19:18 Henry Cai <h...@pinterest.com> wrote: > So is the window defined as hour-window or second-window? > > If I am using hour-window, I guess I need to modify the trigger to fire > early (e.g. every minute)? But I don't want to repeatedly emit the same > joined records for every minute (i.e. on 2nd minute, I only want to emit > the changes introduced by new coming records between 1st and 2nd minute) > > If I am using second-window, I wasn't sure why the record will still be > put into the correct window based on hour gap? > > Another question is on which type of window, I need to match record a from > stream a to record b in stream b if abs(a.time - b.time) < 1-hour, so it's > not really a tumbling window on absolute wall clock, is this a session > window? > > On Fri, Apr 29, 2016 at 4:36 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> you are right, everything will be emitted in a huge burst at the end of >> the hour. If you want to experiment a bit you can write a custom Trigger >> based on EventTimeTrigger that will delay firing of windows. You would >> change onEventTime() to not fire but instead register a processing-time >> timer at a random point in the future. Then, in onProcessingTime() you >> would trigger the actual window processing. Elements will still be put into >> the correct windows based on event time, just the firing of the windows >> will change by doing this. >> >> Cheers, >> Aljoscha >> >> On Fri, 29 Apr 2016 at 08:53 Henry Cai <h...@pinterest.com> wrote: >> >>> But the join requirement is to match the records from two streams >>> occurring within one hour (besides the normal join key condition), if I use >>> the second join window, those records wouldn't be in the same window any >>> more. >>> >>> >>> >>> On Thu, Apr 28, 2016 at 11:47 PM, Ashutosh Kumar < >>> kmr.ashutos...@gmail.com> wrote: >>> >>>> Time unit can be in seconds as well. Is there specific need to get >>>> bursts hourly? >>>> >>>> On Fri, Apr 29, 2016 at 11:48 AM, Henry Cai <h...@pinterest.com> wrote: >>>> >>>>> For the below standard stream/stream join, does flink store the >>>>> results of stream 1 and stream 2 into state store for the current hour and >>>>> at the end of the hour window it will fire the window by iterating through >>>>> all stored elements in the state store to find join matches? >>>>> >>>>> My concern is during most of the time in the hour, the output >>>>> (assuming the output is going to another stream) will be idle and on each >>>>> hour mark there will be huge outputs of joined records emitted, any way to >>>>> make it more gradual? >>>>> >>>>> >>>>> dataStream.join(otherStream) >>>>> .where(0).equalTo(1) >>>>> .window(TumblingEventTimeWindows.of(Time.hours(1))) >>>>> .apply (new JoinFunction () {...}); >>>>> >>>>> >>>> >>> >