Hello,
I am stuck with a weird problem and not able to wrap my head around it.

Here is my pipeline:

SingleOutputStreamOperator<Data> data =
    flattenedPlayerStatsData
        .keyBy(new KeySelector())
        .window(TumblingEventTimeWindows.of(Time.seconds(300)))
        .sideOutputLateData(lateOutputTag)
        .reduce(new MyReducer())
        .name("reduce");

DataStream<Data> lateData = data.getSideOutput(lateOutputTag);

// re-process late data
  SingleOutputStreamOperator<Data> reducedLateData =
      lateData
          .assignTimestampsAndWatermarks(
              
WatermarkStrategy.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(600))
                  .withTimestampAssigner((event, timestamp) -> event
!= null ? event.timestamp : timestamp))
          .startNewChain()
          .keyBy(new KeySelector())
          .window(TumblingEventTimeWindows.of(Time.seconds(300)))
          .reduce(new MyReducer())
          .name("lateReduce");
// add sink
  reducedLateData
      .keyBy(new KeySelector())
      .addSink(new MySink<>())
      .name("lateSink");

What I observe is that the *lateReduce* step is receiving incoming records
but it is not outputting the reduced records to the *lateSink* step.
It seems to be accumulating late records forever.

Is there any issue with the *Timestamps/Watermarks* step ?

I see that this step is also receiving the records and outputting the same
numbers of records to the *lateReduce* step.

Please let me know what I may be doing wrong.
If I don't assign fresh timestamp and watermarking to the *reducedLateData*
stream then I notice that this *lateReduce* step now drops late records
which were before dropped by *reduce* step.

Thanks
Sachin

Reply via email to