I created a small test to see if I could replicate this... but I couldn't :-) Below is my code that provides a counter example. It is not very clean, but perhaps it is useful for someone else in the future:
class SessionWindowTest extends FunSuite with Matchers { test("Should advance watermark correctly") { val startTime = 0L val elements1 = List[Tester]( Tester("id1:a", "id2:a", startTime), Tester("id1:b", "id2:a", startTime+1), Tester("id1:b", "id2:a", startTime+100), Tester("id1:a", "id2:a", startTime+1) ) val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamEnv.getConfig.disableSysoutLogging() class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[Tester] { override def extractTimestamp(element: Tester, previousElementTimestamp: Long): Long = { element.time } override def checkAndGetNextWatermark(lastElement: Tester, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp) } } val stream = streamEnv.addSource(new SourceFunction[Tester]() { def run(ctx: SourceFunction.SourceContext[Tester]) { elements1.foreach { ctx.collect } } override def cancel(): Unit = {} }).assignTimestampsAndWatermarks(new PunctuatedAssigner) val sessionsStream = stream .keyBy(_.id1) .window(EventTimeSessionWindows.withGap(Time.milliseconds(2))) .apply( (key: String, windowInfo, iter: Iterable[Tester], collector: Collector[Tester]) => { val elements = iter.toList println("Session window. Elements:", elements) println(windowInfo) collector.collect(elements.reverse.head) }) val countStream = sessionsStream .keyBy(_.id2) .window(TumblingEventTimeWindows.of(Time.milliseconds(2))) .apply( (key: String, windowInfo, iter: Iterable[Tester], collector: Collector[Tester]) => { val elements = iter.toList println("Tumbling window. Elements:", elements, windowInfo.getStart, windowInfo.getEnd) collector.collect(elements.reverse.head) }) sessionsStream.print() countStream.print() streamEnv.execute() } } On Tue, Feb 26, 2019 at 10:49 PM Padarn Wilson <pad...@gmail.com> wrote: > Okay. I think I still must misunderstand something here. I will work on > building a unit test around this, hopefully this clears up my confusion. > > Thank you, > Padarn > > On Tue, Feb 26, 2019 at 10:28 PM Till Rohrmann <trohrm...@apache.org> > wrote: > >> Operator's with multiple inputs emit the minimum of the input's >> watermarks downstream. In case of a keyBy this means that the watermark is >> sent to all downstream consumers. >> >> Cheers, >> Till >> >> On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson <pad...@gmail.com> wrote: >> >>> Just to add: by printing intermediate results I see that I definitely >>> have more than five minutes of data, and by windowing without the session >>> windows I see that event time watermarks do seem to be generated as >>> expected. >>> >>> Thanks for your help and time. >>> >>> Padarn >>> >>> On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson <pad...@gmail.com> wrote: >>> >>>> Hi Till, >>>> >>>> I will work on an example, but I’m a little confused by how keyBy and >>>> watermarks work in this case. This documentation says ( >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams >>>> ): >>>> >>>> >>>> Some operators consume multiple input streams; a union, for example, or >>>> operators following a *keyBy(…)*or *partition(…)* function. Such an >>>> operator’s current event time is the minimum of its input streams’ event >>>> times. As its input streams update their event times, so does the operator. >>>> >>>> >>>> This implies to me that the keyBy splits the watermark? >>>> >>>> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Hi Padarn, >>>>> >>>>> Flink does not generate watermarks per keys. Atm watermarks are always >>>>> global. Therefore, I would suspect that it is rather a problem with >>>>> generating watermarks at all. Could it be that your input data does not >>>>> span a period longer than 5 minutes and also does not terminate? Another >>>>> problem could be the CountTrigger which should not react to the window's >>>>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and >>>>> I think this will cause the window to not fire. Maybe a working example >>>>> program with example input could be helpful for further debugging. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson <pad...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Flink Mailing List, >>>>>> >>>>>> Long story short - I want to somehow collapse watermarks at an >>>>>> operator across keys, so that keys with dragging watermarks do not drag >>>>>> behind. Details below: >>>>>> >>>>>> --- >>>>>> >>>>>> I have an application in which I want to perform the follow sequence >>>>>> of steps: Assume my data is made up of data that has: (time, user, >>>>>> location, action) >>>>>> >>>>>> -> Read source >>>>>> -> KeyBy (UserId, Location) >>>>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location >>>>>> Session) >>>>>> -> TriggerOnFirst event >>>>>> -> KeyBy (Location) >>>>>> -> SlidingEventTimeWindow(5min length, 5 second gap) >>>>>> -> Count >>>>>> >>>>>> The end intention is to count the number of unique users in a given >>>>>> location - the EventTimeSessionWindow is used to make sure users are only >>>>>> counted once. >>>>>> >>>>>> So I created a custom Trigger, which is the same as CountTrigger, but >>>>>> has the following `TriggerResult" funtion: >>>>>> >>>>>> @Override >>>>>> public TriggerResult onElement(Object element, long timestamp, W window, >>>>>> TriggerContext ctx) throws Exception { >>>>>> ReducingState<Long> count = ctx.getPartitionedState(stateDesc); >>>>>> count.add(1L); >>>>>> if (count.get() == maxCount) { >>>>>> return TriggerResult.FIRE_AND_PURGE; >>>>>> } else if (count.get() > maxCount) { >>>>>> return TriggerResult.PURGE; >>>>>> } >>>>>> return TriggerResult.CONTINUE; >>>>>> >>>>>> } >>>>>> >>>>>> But my final SlidingEventTimeWindow does not fire properly. This is >>>>>> because (I assume) there are some users with sessions windows that are >>>>>> not >>>>>> closed, and so the watermark for those keys is running behind and so the >>>>>> SlidingEventTimeWindow watermark is held back too. >>>>>> >>>>>> What I feel like I want to achieve is essentially setting the >>>>>> watermark of the SlidingEventTimeWindow operator to be the maximum (with >>>>>> lateness) of the input keys, rather than the minimum, but I cannot tell >>>>>> if >>>>>> this is possible, and if not, what another approach could be. >>>>>> >>>>>> Thanks, >>>>>> Padarn >>>>>> >>>>>