Hi Everyone, I am trying to execute this simple sessionization pipeline, with the allowed lateness shown below:
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(2) val source: DataStream[Event] = env.addSource(new SourceFunction[Event] { lazy val input: Seq[Event] = Seq( Event("u1", "e1", 1L), Event("u1", "e5", 6L), Event("u1", "e7", 11L), Event("u1", "e8", 12L), Event("u1", "e9", 16L), Event("u1", "e11", 14L), * Event("u1", "e12", 8L),* Event("u1", "e13", 20L), ) override def run(ctx: SourceFunction.SourceContext[Event]): Unit = { { input.foreach(event => { ctx.collectWithTimestamp(event, event.timestamp) * ctx.emitWatermark(new Watermark(event.timestamp - 1))* }) ctx.emitWatermark(new Watermark(Long.MaxValue)) } } override def cancel(): Unit = {} }) val tag: OutputTag[Event] = OutputTag("late-data") val sessionizedStream: DataStream[Event] = source .keyBy(item => item.userId) * .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))* .sideOutputLateData(tag) * .allowedLateness(Time.milliseconds(2L))* .process(new ProcessWindowFunction[Event, Event, String, TimeWindow] { override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[Event]): Unit = { val sessionIdForWindow = key + "-" + context.currentWatermark + "-" + context.window.getStart elements.toSeq .sortBy(event => event.timestamp) .foreach(event => { out.collect(event.copy(sessionId = sessionIdForWindow, count = elements.size)) }) } }) sessionizedStream.getSideOutput(tag).print() env.execute() } But heres the problem. I am expecting the event highlighted in red above(e12) , to be collected in the side output as a late event. But it isn't. The event is not printed. Whats interesting is, if I make *any one* of the following changes, the event e12 is considered late and is printed. 1) Event("u1", "e12", 8L) *change to *Event("u1", "e12", *7L*) 2) allowedLateness(Time.milliseconds(2L)) change to allowedLateness(Time.milliseconds(*1L*)) 3) Event("u1", "e12", 8L) *change to *Event("u1", "e12", *7L*) *AND* allowedLateness(Time.milliseconds(2L)) *change to * allowedLateness(Time.milliseconds(4*L*)) // or anything less than 7L Can someone explain whats going on? What am I missing here? regards -Indraneel