Hi Elias, thanks for the long write-up. It's interesting that it actually kinda works right now.
You might be interested in a design doc that we're currently working on. I posted it on the dev list but here it is: https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit?usp=sharing I'm trying to add support for side inputs. They are excellent for the use case where you want to enrich (join) a main stream with one or several other streams. This would also include support for different windows on the different streams and a mechanism for mapping main windows to the correct side-input windows. Feedback/suggestions are very welcome on this! Cheers, Aljoscha On Tue, 3 May 2016 at 03:06 Elias Levy <fearsome.lucid...@gmail.com> wrote: > Thanks for the suggestion. I ended up implementing it a different way. > > What is needed is a mechanism to give each stream a different window > assigner, and then let Flink perform the join normally given the assigned > windows. > > Specifically, for my use case what I need is a sliding window for one > stream and a trailing window for the other stream. A trailing window is > just a TimeWindow where the window end time is the event time, rounded up > or down some amount, and the window start time is is end time minus some > given parameter. > > For instance: > > class TrailingEventTimeWindows(asize: Long, around: Long) extends > WindowAssigner[Object, TimeWindow] { > val size = asize > val round = around > > override def assignWindows(element: Object, timestamp: Long): > Collection[TimeWindow] = { > if (timestamp > java.lang.Long.MIN_VALUE) { > val end = (timestamp - (timestamp % round)) + round > Collections.singletonList(new TimeWindow(end - size, end)) > } else { > // Long.MIN_VALUE is currently assigned when no timestamp is present > throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= > no timestamp marker). " + > "Is the time characteristic set to 'ProcessingTime', or did you > forget to call " + > "'DataStream.assignTimestampsAndWatermarks(...)'?") > } > } > > def getSize: Long = size > > override def getDefaultTrigger(env: JStreamExecutionEnvironment): > Trigger[Object, TimeWindow] = EventTimeTrigger.create() > > override def toString: String = s"TrailingEventTimeWindows($size)" > > override def getWindowSerializer(executionConfig: ExecutionConfig): > TypeSerializer[TimeWindow] = new TimeWindow.Serializer() > } > > object TrailingEventTimeWindows { > def of(size: Time, round: Time) = new > TrailingEventTimeWindows(size.toMilliseconds(), round.toMilliseconds()) > } > > > > If the Flink API where different, then I could do something like this to > join the streams: > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > // (time, key, id) > val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100, > 10), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30), (7000, 100, 40) ) > // (time, file) > val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300), > (4000, 100) ) > > val windowedKeyedChanges = changes > .assignAscendingTimestamps( _._1 ) > .keyBy(1) > .window(TrailingEventTimeWindows.of(Time.seconds(5),Time.seconds(1))) > > val windowedKeyedEvents = > events.assignAscendingTimestamps( _._1 ) > .keyBy(2) > .timeWindow(Time.seconds(5), Time.seconds(1)) > > val results = windowedKeyedEvents.join(windowedKeyedChanges) > .apply { } > > > Alas, the Flink API makes this more complicated. Instead of allowing you > to joined to keyed windowed streams, you join two unkeyed unwind owed > streams and tell it how to key them and join them using > join().where().equalTo().window(). Since that construct only takes a > single WindowAssigner I created a window assigner that uses a different > assigner for each stream being joined: > > class DualWindowAssigner[T1 <: Object, T2 <: Object](assigner1: > WindowAssigner[Object, TimeWindow], assigner2: WindowAssigner[Object, > TimeWindow]) extends WindowAssigner[Object, TimeWindow] { > val windowAssigner1 = assigner1 > val windowAssigner2 = assigner2 > > override def assignWindows(element: Object, timestamp: Long): > Collection[TimeWindow] = { > val e = element.asInstanceOf[TaggedUnion[T1,T2]] > if (e.isOne) { > windowAssigner1.assignWindows(e.getOne, timestamp) > } else { > windowAssigner2.assignWindows(e.getTwo, timestamp) > } > } > > override def getDefaultTrigger(env: JStreamExecutionEnvironment): > Trigger[Object, TimeWindow] = EventTimeTrigger.create() > > override def toString: String = s"DualWindowAssigner" > > override def getWindowSerializer(executionConfig: ExecutionConfig): > TypeSerializer[TimeWindow] = new TimeWindow.Serializer() > } > > > Then I can do: > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > // (time, key, id) > val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100, > 10), (1500, 300, 20), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30), > (7000, 100, 40) ) > // (time, key) > val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300), > (4000, 100) ) > > val eventsWithTime = events.assignAscendingTimestamps( _._1 ) > val changesWithTime = changes.assignAscendingTimestamps( _._1 ) > > val results = eventsWithTime.join(changesWithTime) > .where( _._2 ).equalTo( _._2 ) > .window(new DualWindowAssigner[Tuple3[Int,Int,Int],Tuple2[Int,Int]]( > SlidingEventTimeWindows.of( Time.seconds(4), Time.seconds(1)), > TrailingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)) > )) > .apply { (x1, x2) => (x1, x2) } > > results.print() > > > This works as Flink will consider two TimeWindows the same so long as they > have the same start and end time. So as long as the sliding and trailing > windows have the same size and the are rounded to correctly, they will > match. > > I think the idea of a trailing window is a powerful one. It would be > useful is one where included in the Flink API. > > Being able to join streams with different window assigners is also useful > as evidenced by my use case. Maybe some thought should be given on how to > support that use case officially. > > Thoughts? Comments? > > > On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> orry for the late reply. You're right that with the windowed join you >> would have to deal with pairs where the timestamp of (x,y) is not >> necessarily earlier than the timestamp of z. Moreover, by using sliding >> windows you would receive duplicates as you've described. Using tumbling >> windows would mean that you lose join matches if (x,y) lives in an earlier >> window. Thus, in order to solve your problem you would have to write a >> custom stream operator. >> >> The stream operator would do the following: Collecting the inputs from >> (x,y) and z which are already keyed. Thus, we know that x=z holds true. >> Using a priority queue we order the elements because we don't know how the >> arrive at the operator. Whenever we receive a watermark indicating that no >> earlier events can arrive anymore, we can go through the two priority >> queues to join the elements. The queues are part of the operators state so >> that we don't lose information in case of a recovery. >> >> I've sketched such an operator here [1]. I hope this helps you to get >> started. >> > > >