I created a small test to see if I could replicate this... but I couldn't
:-) Below is my code that provides a counter example. It is not very clean,
but perhaps it is useful for someone else in the future:


class SessionWindowTest extends FunSuite with Matchers {

  test("Should advance watermark correctly") {


    val startTime = 0L

    val elements1 = List[Tester](
      Tester("id1:a", "id2:a", startTime),
      Tester("id1:b", "id2:a", startTime+1),
      Tester("id1:b", "id2:a", startTime+100),
      Tester("id1:a", "id2:a", startTime+1)
    )

    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamEnv.setParallelism(1)
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    streamEnv.getConfig.disableSysoutLogging()

    class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Tester] {

      override def extractTimestamp(element: Tester,
previousElementTimestamp: Long): Long = {
        element.time
      }
      override def checkAndGetNextWatermark(lastElement: Tester,
extractedTimestamp: Long): Watermark = {
        new Watermark(extractedTimestamp)
      }
    }

    val stream = streamEnv.addSource(new SourceFunction[Tester]() {
      def run(ctx: SourceFunction.SourceContext[Tester]) {
        elements1.foreach {
          ctx.collect
        }
      }
      override def cancel(): Unit = {}
    }).assignTimestampsAndWatermarks(new PunctuatedAssigner)


    val sessionsStream = stream
        .keyBy(_.id1)
        .window(EventTimeSessionWindows.withGap(Time.milliseconds(2)))
      .apply(
        (key: String, windowInfo, iter: Iterable[Tester], collector:
Collector[Tester]) => {
          val elements = iter.toList
          println("Session window. Elements:", elements)
          println(windowInfo)
          collector.collect(elements.reverse.head)
        })

    val countStream = sessionsStream
        .keyBy(_.id2)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
        .apply(
        (key: String, windowInfo, iter: Iterable[Tester], collector:
Collector[Tester]) => {
          val elements = iter.toList
          println("Tumbling window. Elements:", elements,
windowInfo.getStart, windowInfo.getEnd)
          collector.collect(elements.reverse.head)
        })

    sessionsStream.print()
    countStream.print()

    streamEnv.execute()

  }

}


On Tue, Feb 26, 2019 at 10:49 PM Padarn Wilson <pad...@gmail.com> wrote:

> Okay. I think I still must misunderstand something here. I will work on
> building a unit test around this, hopefully this clears up my confusion.
>
> Thank you,
> Padarn
>
> On Tue, Feb 26, 2019 at 10:28 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Operator's with multiple inputs emit the minimum of the input's
>> watermarks downstream. In case of a keyBy this means that the watermark is
>> sent to all downstream consumers.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson <pad...@gmail.com> wrote:
>>
>>> Just to add: by printing intermediate results I see that I definitely
>>> have more than five minutes of data, and by windowing without the session
>>> windows I see that event time watermarks do seem to be generated as
>>> expected.
>>>
>>> Thanks for your help and time.
>>>
>>> Padarn
>>>
>>> On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <pad...@gmail.com> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> I will work on an example, but I’m a little confused by how keyBy and
>>>> watermarks work in this case. This documentation says (
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
>>>> ):
>>>>
>>>>
>>>> Some operators consume multiple input streams; a union, for example, or
>>>> operators following a *keyBy(…)*or *partition(…)* function. Such an
>>>> operator’s current event time is the minimum of its input streams’ event
>>>> times. As its input streams update their event times, so does the operator.
>>>>
>>>>
>>>> This implies to me that the keyBy splits the watermark?
>>>>
>>>> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Padarn,
>>>>>
>>>>> Flink does not generate watermarks per keys. Atm watermarks are always
>>>>> global. Therefore, I would suspect that it is rather a problem with
>>>>> generating watermarks at all. Could it be that your input data does not
>>>>> span a period longer than 5 minutes and also does not terminate? Another
>>>>> problem could be the CountTrigger which should not react to the window's
>>>>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and
>>>>> I think this will cause the window to not fire. Maybe a working example
>>>>> program with example input could be helpful for further debugging.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pad...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Flink Mailing List,
>>>>>>
>>>>>> Long story short - I want to somehow collapse watermarks at an
>>>>>> operator across keys, so that keys with dragging watermarks do not drag
>>>>>> behind. Details below:
>>>>>>
>>>>>> ---
>>>>>>
>>>>>> I have an application in which I want to perform the follow sequence
>>>>>> of steps: Assume my data is made up of data that has: (time, user,
>>>>>> location, action)
>>>>>>
>>>>>> -> Read source
>>>>>> -> KeyBy (UserId, Location)
>>>>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location
>>>>>> Session)
>>>>>> -> TriggerOnFirst event
>>>>>> -> KeyBy (Location)
>>>>>> -> SlidingEventTimeWindow(5min length, 5 second gap)
>>>>>> -> Count
>>>>>>
>>>>>> The end intention is to count the number of unique users in a given
>>>>>> location - the EventTimeSessionWindow is used to make sure users are only
>>>>>> counted once.
>>>>>>
>>>>>> So I created a custom Trigger, which is the same as CountTrigger, but
>>>>>> has the following `TriggerResult" funtion:
>>>>>>
>>>>>> @Override
>>>>>> public TriggerResult onElement(Object element, long timestamp, W window, 
>>>>>> TriggerContext ctx) throws Exception {
>>>>>>   ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
>>>>>>   count.add(1L);
>>>>>>   if (count.get() == maxCount) {
>>>>>>     return TriggerResult.FIRE_AND_PURGE;
>>>>>>   } else if (count.get() > maxCount) {
>>>>>>     return TriggerResult.PURGE;
>>>>>>   }
>>>>>>   return TriggerResult.CONTINUE;
>>>>>>
>>>>>> }
>>>>>>
>>>>>> But my final SlidingEventTimeWindow does not fire properly. This is
>>>>>> because (I assume) there are some users with sessions windows that are 
>>>>>> not
>>>>>> closed, and so the watermark for those keys is running behind and so the
>>>>>> SlidingEventTimeWindow watermark is held back too.
>>>>>>
>>>>>> What I feel like I want to achieve is essentially setting the
>>>>>> watermark of the SlidingEventTimeWindow operator to be the maximum (with
>>>>>> lateness) of the input keys, rather than the minimum, but I cannot tell 
>>>>>> if
>>>>>> this is possible, and if not, what another approach could be.
>>>>>>
>>>>>> Thanks,
>>>>>> Padarn
>>>>>>
>>>>>

Reply via email to