I am stuck with a weird problem and not able to wrap my head around it.

Here is my pipeline:

SingleOutputStreamOperator<Data> data =
        .keyBy(new KeySelector())
        .reduce(new MyReducer())

DataStream<Data> lateData = data.getSideOutput(lateOutputTag);

// re-process late data
  SingleOutputStreamOperator<Data> reducedLateData =
                  .withTimestampAssigner((event, timestamp) -> event
!= null ? event.timestamp : timestamp))
          .keyBy(new KeySelector())
          .reduce(new MyReducer())
// add sink
      .keyBy(new KeySelector())
      .addSink(new MySink<>())

What I observe is that the *lateReduce* step is receiving incoming records
but it is not outputting the reduced records to the *lateSink* step.
It seems to be accumulating late records forever.

Is there any issue with the *Timestamps/Watermarks* step ?

I see that this step is also receiving the records and outputting the same
numbers of records to the *lateReduce* step.

Please let me know what I may be doing wrong.
If I don't assign fresh timestamp and watermarking to the *reducedLateData*
stream then I notice that this *lateReduce* step now drops late records
which were before dropped by *reduce* step.


