Hello, I am stuck with a weird problem and not able to wrap my head around it.
Here is my pipeline: SingleOutputStreamOperator<Data> data = flattenedPlayerStatsData .keyBy(new KeySelector()) .window(TumblingEventTimeWindows.of(Time.seconds(300))) .sideOutputLateData(lateOutputTag) .reduce(new MyReducer()) .name("reduce"); DataStream<Data> lateData = data.getSideOutput(lateOutputTag); // re-process late data SingleOutputStreamOperator<Data> reducedLateData = lateData .assignTimestampsAndWatermarks( WatermarkStrategy.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(600)) .withTimestampAssigner((event, timestamp) -> event != null ? event.timestamp : timestamp)) .startNewChain() .keyBy(new KeySelector()) .window(TumblingEventTimeWindows.of(Time.seconds(300))) .reduce(new MyReducer()) .name("lateReduce"); // add sink reducedLateData .keyBy(new KeySelector()) .addSink(new MySink<>()) .name("lateSink"); What I observe is that the *lateReduce* step is receiving incoming records but it is not outputting the reduced records to the *lateSink* step. It seems to be accumulating late records forever. Is there any issue with the *Timestamps/Watermarks* step ? I see that this step is also receiving the records and outputting the same numbers of records to the *lateReduce* step. Please let me know what I may be doing wrong. If I don't assign fresh timestamp and watermarking to the *reducedLateData* stream then I notice that this *lateReduce* step now drops late records which were before dropped by *reduce* step. Thanks Sachin