Thanks for the suggestion.  I ended up implementing it a different way.

What is needed is a mechanism to give each stream a different window
assigner, and then let Flink perform the join normally given the assigned
windows.

Specifically, for my use case what I need is a sliding window for one
stream and a trailing window for the other stream.  A trailing window is
just a TimeWindow where the window end time is the event time, rounded up
or down some amount, and the window start time is is end time minus some
given parameter.

For instance:

class TrailingEventTimeWindows(asize: Long, around: Long) extends
WindowAssigner[Object, TimeWindow] {
  val size  = asize
  val round = around

  override def assignWindows(element: Object, timestamp: Long):
Collection[TimeWindow] = {
    if (timestamp > java.lang.Long.MIN_VALUE) {
      val end = (timestamp - (timestamp % round)) + round
      Collections.singletonList(new TimeWindow(end - size, end))
    } else {
      // Long.MIN_VALUE is currently assigned when no timestamp is present
      throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
timestamp marker). " +
          "Is the time characteristic set to 'ProcessingTime', or did you
forget to call " +
          "'DataStream.assignTimestampsAndWatermarks(...)'?")
    }
  }

  def getSize: Long = size

  override def getDefaultTrigger(env: JStreamExecutionEnvironment):
Trigger[Object, TimeWindow] = EventTimeTrigger.create()

  override def toString: String = s"TrailingEventTimeWindows($size)"

  override def getWindowSerializer(executionConfig: ExecutionConfig):
TypeSerializer[TimeWindow] = new TimeWindow.Serializer()
}

object TrailingEventTimeWindows {
  def of(size: Time, round: Time) = new
TrailingEventTimeWindows(size.toMilliseconds(), round.toMilliseconds())
}



If the Flink API where different, then I could do something like this to
join the streams:

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // (time, key,  id)
    val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100,
10), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30), (7000, 100, 40) )
    // (time, file)
    val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300),
(4000, 100) )

    val windowedKeyedChanges = changes
      .assignAscendingTimestamps( _._1 )
      .keyBy(1)
      .window(TrailingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)))

    val windowedKeyedEvents =
      events.assignAscendingTimestamps( _._1 )
        .keyBy(2)
        .timeWindow(Time.seconds(5), Time.seconds(1))

    val results = windowedKeyedEvents.join(windowedKeyedChanges)
      .apply { }


Alas, the Flink API makes this more complicated.  Instead of allowing you
to joined to keyed windowed streams, you join two unkeyed unwind owed
streams and tell it how to key them and join them using
join().where().equalTo().window().  Since that construct only takes a
single WindowAssigner I created a window assigner that uses a different
assigner for each stream being joined:

class DualWindowAssigner[T1 <: Object, T2 <: Object](assigner1:
WindowAssigner[Object, TimeWindow], assigner2: WindowAssigner[Object,
TimeWindow]) extends WindowAssigner[Object, TimeWindow] {
  val windowAssigner1 = assigner1
  val windowAssigner2 = assigner2

  override def assignWindows(element: Object, timestamp: Long):
Collection[TimeWindow] = {
    val e = element.asInstanceOf[TaggedUnion[T1,T2]]
    if (e.isOne) {
      windowAssigner1.assignWindows(e.getOne, timestamp)
    } else {
      windowAssigner2.assignWindows(e.getTwo, timestamp)
    }
  }

  override def getDefaultTrigger(env: JStreamExecutionEnvironment):
Trigger[Object, TimeWindow] = EventTimeTrigger.create()

  override def toString: String = s"DualWindowAssigner"

  override def getWindowSerializer(executionConfig: ExecutionConfig):
TypeSerializer[TimeWindow] = new TimeWindow.Serializer()
}


Then I can do:

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // (time, key,  id)
    val events: DataStream[(Int,Int,Int)] = env.fromElements( (1000, 100,
10), (1500, 300, 20), (2000, 200, 10), (3000, 100, 20), (4000, 300, 30),
(7000, 100, 40) )
    // (time, key)
    val changes: DataStream[(Int,Int)] = env.fromElements( (2000, 300),
(4000, 100) )

    val eventsWithTime     = events.assignAscendingTimestamps( _._1 )
    val changesWithTime = changes.assignAscendingTimestamps( _._1 )

    val results = eventsWithTime.join(changesWithTime)
      .where( _._2 ).equalTo( _._2 )
      .window(new DualWindowAssigner[Tuple3[Int,Int,Int],Tuple2[Int,Int]](
        SlidingEventTimeWindows.of( Time.seconds(4), Time.seconds(1)),
        TrailingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))
      ))
      .apply { (x1, x2) => (x1, x2) }

    results.print()


This works as Flink will consider two TimeWindows the same so long as they
have the same start and end time.  So as long as the sliding and trailing
windows have the same size and the are rounded to correctly, they will
match.

I think the idea of a trailing window is a powerful one.  It would be
useful is one where included in the Flink API.

Being able to join streams with different window assigners is also useful
as evidenced by my use case.  Maybe some thought should be given on how to
support that use case officially.

Thoughts? Comments?


On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann <trohrm...@apache.org>
wrote:

> orry for the late reply. You're right that with the windowed join you
> would have to deal with pairs where the timestamp of (x,y) is not
> necessarily earlier than the timestamp of z. Moreover, by using sliding
> windows you would receive duplicates as you've described. Using tumbling
> windows would mean that you lose join matches if (x,y) lives in an earlier
> window. Thus, in order to solve your problem you would have to write a
> custom stream operator.
>
> The stream operator would do the following: Collecting the inputs from
> (x,y) and z which are already keyed. Thus, we know that x=z holds true.
> Using a priority queue we order the elements because we don't know how the
> arrive at the operator. Whenever we receive a watermark indicating that no
> earlier events can arrive anymore, we can go through the two priority
> queues to join the elements. The queues are part of the operators state so
> that we don't lose information in case of a recovery.
>
> I've sketched such an operator here [1]. I hope this helps you to get
> started.
>

Reply via email to