Hi Indraneel, In your case, ("u1", "e12", 8L) is not considered late and will go into the session window {e7,e8,e9,e11} (range=11~19). This is because 8+3(session gap) >= 11, the lower bound of the existing session window
Regarding your 3 questions: *>> 1) Event("u1", "e12", 8L) change to Event("u1", "e12", 7L)* 7+3 < 11, so e12 will not go into the session window {e7,e8,e9,e11}. And it will be fired for the lateness. *>> 2) allowedLateness(Time.milliseconds(2L)) change to allowedLateness(Time.milliseconds(1L)) * Reduce the allowedLateness will cause window {e7,e8} to be fired when e9 arrives. So when e12 arrives, the existing session window is (e9,e11} (range=14~17). e12 will be considered to be late in this case. *>> 3) Event("u1", "e12", 8L) change to Event("u1", "e12", 7L) AND allowedLateness(Time.milliseconds(2L)) change to allowedLateness(Time.milliseconds(4L)) * The same as case 1). Thanks, Zhu Zhu Indraneel R <vascodaga...@gmail.com> 于2019年9月26日周四 上午2:24写道: > Hi Everyone, > > I am trying to execute this simple sessionization pipeline, with the > allowed lateness shown below: > > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.setParallelism(2) > > > val source: DataStream[Event] = env.addSource(new > SourceFunction[Event] { > lazy val input: Seq[Event] = Seq( > Event("u1", "e1", 1L), > Event("u1", "e5", 6L), > Event("u1", "e7", 11L), > Event("u1", "e8", 12L), > Event("u1", "e9", 16L), > Event("u1", "e11", 14L), > * Event("u1", "e12", 8L),* > Event("u1", "e13", 20L), > ) > > override def run(ctx: SourceFunction.SourceContext[Event]): Unit = { > { > input.foreach(event => { > ctx.collectWithTimestamp(event, event.timestamp) > * ctx.emitWatermark(new Watermark(event.timestamp - 1))* > }) > ctx.emitWatermark(new Watermark(Long.MaxValue)) > } > } > > override def cancel(): Unit = {} > }) > > val tag: OutputTag[Event] = OutputTag("late-data") > > val sessionizedStream: DataStream[Event] = source > .keyBy(item => item.userId) > * .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))* > .sideOutputLateData(tag) > * .allowedLateness(Time.milliseconds(2L))* > .process(new ProcessWindowFunction[Event, Event, String, TimeWindow] > { > > override def process(key: String, context: Context, elements: > Iterable[Event], out: Collector[Event]): Unit = { > val sessionIdForWindow = key + "-" + context.currentWatermark + > "-" + context.window.getStart > > elements.toSeq > .sortBy(event => event.timestamp) > .foreach(event => { > out.collect(event.copy(sessionId = sessionIdForWindow, count > = elements.size)) > }) > } > }) > > sessionizedStream.getSideOutput(tag).print() > env.execute() > } > > But heres the problem. I am expecting the event highlighted in red > above(e12) , to be collected in the side output as a late event. > > But it isn't. The event is not printed. > > Whats interesting is, if I make *any one* of the following changes, the > event e12 is considered late and is printed. > 1) Event("u1", "e12", 8L) *change to *Event("u1", "e12", *7L*) > 2) allowedLateness(Time.milliseconds(2L)) change > to allowedLateness(Time.milliseconds(*1L*)) > 3) Event("u1", "e12", 8L) *change to *Event("u1", "e12", > *7L*) *AND* > allowedLateness(Time.milliseconds(2L)) *change to * > allowedLateness(Time.milliseconds(4*L*)) // or anything less than 7L > > Can someone explain whats going on? What am I missing here? > > > regards > -Indraneel > >