Thanks for the suggestion. I ended up implementing it a different way. What is needed is a mechanism to give each stream a different window assigner, and then let Flink perform the join normally given the assigned windows.
Specifically, for my use case what I need is a sliding window for one stream and a trailing window for the other stream. A trailing window is just a TimeWindow where the window end time is the event time, rounded up or down some amount, and the window start time is is end time minus some given parameter. For instance: class TrailingEventTimeWindows(asize: Long, around: Long) extends WindowAssigner[Object, TimeWindow] { val size = asize val round = around override def assignWindows(element: Object, timestamp: Long): Collection[TimeWindow] = { if (timestamp > java.lang.Long.MIN_VALUE) { val end = (timestamp - (timestamp % round)) + round Collections.singletonList(new TimeWindow(end - size, end)) } else { // Long.MIN_VALUE is currently assigned when no timestamp is present throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?") } } def getSize: Long = size override def getDefaultTrigger(env: JStreamExecutionEnvironment): Trigger[Object, TimeWindow] = EventTimeTrigger.create() override def toString: String = s"TrailingEventTimeWindows($size)" override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = new TimeWindow.Serializer() } object TrailingEventTimeWindows { def of(size: Time, round: Time) = new TrailingEventTimeWindows(size.toMilliseconds(), round.toMilliseconds()) } If the Flink API where different, then I could do something like this to join the streams: val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // (time, key, id) val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100, 10), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30), (7000, 100, 40) ) // (time, file) val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300), (4000, 100) ) val windowedKeyedChanges = changes .assignAscendingTimestamps( _._1 ) .keyBy(1) .window(TrailingEventTimeWindows.of(Time.seconds(5),Time.seconds(1))) val windowedKeyedEvents = events.assignAscendingTimestamps( _._1 ) .keyBy(2) .timeWindow(Time.seconds(5), Time.seconds(1)) val results = windowedKeyedEvents.join(windowedKeyedChanges) .apply { } Alas, the Flink API makes this more complicated. Instead of allowing you to joined to keyed windowed streams, you join two unkeyed unwind owed streams and tell it how to key them and join them using join().where().equalTo().window(). Since that construct only takes a single WindowAssigner I created a window assigner that uses a different assigner for each stream being joined: class DualWindowAssigner[T1 <: Object, T2 <: Object](assigner1: WindowAssigner[Object, TimeWindow], assigner2: WindowAssigner[Object, TimeWindow]) extends WindowAssigner[Object, TimeWindow] { val windowAssigner1 = assigner1 val windowAssigner2 = assigner2 override def assignWindows(element: Object, timestamp: Long): Collection[TimeWindow] = { val e = element.asInstanceOf[TaggedUnion[T1,T2]] if (e.isOne) { windowAssigner1.assignWindows(e.getOne, timestamp) } else { windowAssigner2.assignWindows(e.getTwo, timestamp) } } override def getDefaultTrigger(env: JStreamExecutionEnvironment): Trigger[Object, TimeWindow] = EventTimeTrigger.create() override def toString: String = s"DualWindowAssigner" override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = new TimeWindow.Serializer() } Then I can do: val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // (time, key, id) val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100, 10), (1500, 300, 20), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30), (7000, 100, 40) ) // (time, key) val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300), (4000, 100) ) val eventsWithTime = events.assignAscendingTimestamps( _._1 ) val changesWithTime = changes.assignAscendingTimestamps( _._1 ) val results = eventsWithTime.join(changesWithTime) .where( _._2 ).equalTo( _._2 ) .window(new DualWindowAssigner[Tuple3[Int,Int,Int],Tuple2[Int,Int]]( SlidingEventTimeWindows.of( Time.seconds(4), Time.seconds(1)), TrailingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)) )) .apply { (x1, x2) => (x1, x2) } results.print() This works as Flink will consider two TimeWindows the same so long as they have the same start and end time. So as long as the sliding and trailing windows have the same size and the are rounded to correctly, they will match. I think the idea of a trailing window is a powerful one. It would be useful is one where included in the Flink API. Being able to join streams with different window assigners is also useful as evidenced by my use case. Maybe some thought should be given on how to support that use case officially. Thoughts? Comments? On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann <trohrm...@apache.org> wrote: > orry for the late reply. You're right that with the windowed join you > would have to deal with pairs where the timestamp of (x,y) is not > necessarily earlier than the timestamp of z. Moreover, by using sliding > windows you would receive duplicates as you've described. Using tumbling > windows would mean that you lose join matches if (x,y) lives in an earlier > window. Thus, in order to solve your problem you would have to write a > custom stream operator. > > The stream operator would do the following: Collecting the inputs from > (x,y) and z which are already keyed. Thus, we know that x=z holds true. > Using a priority queue we order the elements because we don't know how the > arrive at the operator. Whenever we receive a watermark indicating that no > earlier events can arrive anymore, we can go through the two priority > queues to join the elements. The queues are part of the operators state so > that we don't lose information in case of a recovery. > > I've sketched such an operator here [1]. I hope this helps you to get > started. >