Hi Elias,
thanks for the long write-up. It's interesting that it actually kinda works
right now.
You might be interested in a design doc that we're currently working on. I
posted it on the dev list but here it is:
https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit?usp=sharing
I'm
trying to add support for side inputs. They are excellent for the use case
where you want to enrich (join) a main stream with one or several other
streams. This would also include support for different windows on the
different streams and a mechanism for mapping main windows to the correct
side-input windows.

Feedback/suggestions are very welcome on this!

Cheers,
Aljoscha



On Tue, 3 May 2016 at 03:06 Elias Levy <fearsome.lucid...@gmail.com> wrote:

> Thanks for the suggestion.  I ended up implementing it a different way.
>
> What is needed is a mechanism to give each stream a different window
> assigner, and then let Flink perform the join normally given the assigned
> windows.
>
> Specifically, for my use case what I need is a sliding window for one
> stream and a trailing window for the other stream.  A trailing window is
> just a TimeWindow where the window end time is the event time, rounded up
> or down some amount, and the window start time is is end time minus some
> given parameter.
>
> For instance:
>
> class TrailingEventTimeWindows(asize: Long, around: Long) extends
> WindowAssigner[Object, TimeWindow] {
>   val size  = asize
>   val round = around
>
>   override def assignWindows(element: Object, timestamp: Long):
> Collection[TimeWindow] = {
>     if (timestamp > java.lang.Long.MIN_VALUE) {
>       val end = (timestamp - (timestamp % round)) + round
>       Collections.singletonList(new TimeWindow(end - size, end))
>     } else {
>       // Long.MIN_VALUE is currently assigned when no timestamp is present
>       throw new RuntimeException("Record has Long.MIN_VALUE timestamp (=
> no timestamp marker). " +
>           "Is the time characteristic set to 'ProcessingTime', or did you
> forget to call " +
>           "'DataStream.assignTimestampsAndWatermarks(...)'?")
>     }
>   }
>
>   def getSize: Long = size
>
>   override def getDefaultTrigger(env: JStreamExecutionEnvironment):
> Trigger[Object, TimeWindow] = EventTimeTrigger.create()
>
>   override def toString: String = s"TrailingEventTimeWindows($size)"
>
>   override def getWindowSerializer(executionConfig: ExecutionConfig):
> TypeSerializer[TimeWindow] = new TimeWindow.Serializer()
> }
>
> object TrailingEventTimeWindows {
>   def of(size: Time, round: Time) = new
> TrailingEventTimeWindows(size.toMilliseconds(), round.toMilliseconds())
> }
>
>
>
> If the Flink API where different, then I could do something like this to
> join the streams:
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>     // (time, key,  id)
>     val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100,
> 10), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30), (7000, 100, 40) )
>     // (time, file)
>     val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300),
> (4000, 100) )
>
>     val windowedKeyedChanges = changes
>       .assignAscendingTimestamps( _._1 )
>       .keyBy(1)
>       .window(TrailingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)))
>
>     val windowedKeyedEvents =
>       events.assignAscendingTimestamps( _._1 )
>         .keyBy(2)
>         .timeWindow(Time.seconds(5), Time.seconds(1))
>
>     val results = windowedKeyedEvents.join(windowedKeyedChanges)
>       .apply { }
>
>
> Alas, the Flink API makes this more complicated.  Instead of allowing you
> to joined to keyed windowed streams, you join two unkeyed unwind owed
> streams and tell it how to key them and join them using
> join().where().equalTo().window().  Since that construct only takes a
> single WindowAssigner I created a window assigner that uses a different
> assigner for each stream being joined:
>
> class DualWindowAssigner[T1 <: Object, T2 <: Object](assigner1:
> WindowAssigner[Object, TimeWindow], assigner2: WindowAssigner[Object,
> TimeWindow]) extends WindowAssigner[Object, TimeWindow] {
>   val windowAssigner1 = assigner1
>   val windowAssigner2 = assigner2
>
>   override def assignWindows(element: Object, timestamp: Long):
> Collection[TimeWindow] = {
>     val e = element.asInstanceOf[TaggedUnion[T1,T2]]
>     if (e.isOne) {
>       windowAssigner1.assignWindows(e.getOne, timestamp)
>     } else {
>       windowAssigner2.assignWindows(e.getTwo, timestamp)
>     }
>   }
>
>   override def getDefaultTrigger(env: JStreamExecutionEnvironment):
> Trigger[Object, TimeWindow] = EventTimeTrigger.create()
>
>   override def toString: String = s"DualWindowAssigner"
>
>   override def getWindowSerializer(executionConfig: ExecutionConfig):
> TypeSerializer[TimeWindow] = new TimeWindow.Serializer()
> }
>
>
> Then I can do:
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>     // (time, key,  id)
>     val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100,
> 10), (1500, 300, 20), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30),
> (7000, 100, 40) )
>     // (time, key)
>     val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300),
> (4000, 100) )
>
>     val eventsWithTime     = events.assignAscendingTimestamps( _._1 )
>     val changesWithTime = changes.assignAscendingTimestamps( _._1 )
>
>     val results = eventsWithTime.join(changesWithTime)
>       .where( _._2 ).equalTo( _._2 )
>       .window(new DualWindowAssigner[Tuple3[Int,Int,Int],Tuple2[Int,Int]](
>         SlidingEventTimeWindows.of( Time.seconds(4), Time.seconds(1)),
>         TrailingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))
>       ))
>       .apply { (x1, x2) => (x1, x2) }
>
>     results.print()
>
>
> This works as Flink will consider two TimeWindows the same so long as they
> have the same start and end time.  So as long as the sliding and trailing
> windows have the same size and the are rounded to correctly, they will
> match.
>
> I think the idea of a trailing window is a powerful one.  It would be
> useful is one where included in the Flink API.
>
> Being able to join streams with different window assigners is also useful
> as evidenced by my use case.  Maybe some thought should be given on how to
> support that use case officially.
>
> Thoughts? Comments?
>
>
> On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> orry for the late reply. You're right that with the windowed join you
>> would have to deal with pairs where the timestamp of (x,y) is not
>> necessarily earlier than the timestamp of z. Moreover, by using sliding
>> windows you would receive duplicates as you've described. Using tumbling
>> windows would mean that you lose join matches if (x,y) lives in an earlier
>> window. Thus, in order to solve your problem you would have to write a
>> custom stream operator.
>>
>> The stream operator would do the following: Collecting the inputs from
>> (x,y) and z which are already keyed. Thus, we know that x=z holds true.
>> Using a priority queue we order the elements because we don't know how the
>> arrive at the operator. Whenever we receive a watermark indicating that no
>> earlier events can arrive anymore, we can go through the two priority
>> queues to join the elements. The queues are part of the operators state so
>> that we don't lose information in case of a recovery.
>>
>> I've sketched such an operator here [1]. I hope this helps you to get
>> started.
>>
>
>
>

Reply via email to