Ah, sorry for the compile issue.

I wasn't able to reproduce the issue; conceptually your code looks fine.

Can you provide us with a self-contained reproducer for the issue?

For reference, here's the test I used, that you can maybe adjust as necessary to replicate your use-case:

@Test public void testWindowLateDataSideOutput()throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final 
OutputTag<Integer> lateTag =new OutputTag<>("tag") {}; final 
SingleOutputStreamOperator<Integer> windowedStream =
            env.addSource(new MySource())
                    
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
                    .keyBy(x -> x)
                    .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                    .allowedLateness(Time.of(1, TimeUnit.SECONDS))
                    .sideOutputLateData(lateTag)
                    .trigger(new MyTrigger())
                    .reduce(Integer::sum); final DataStream<Integer> lateStream = 
windowedStream.getSideOutput(lateTag); lateStream.map(x ->"late: " + x).print(); 
env.execute(); }

private static class MySourceimplements SourceFunction<Integer> {@Override public 
void run(SourceContext<Integer> ctx)throws Exception {
        final long lateTime =1000; long currentTime =5000; while (true) {
            ctx.collectWithTimestamp(1, lateTime); ctx.collectWithTimestamp(2, 
currentTime); currentTime +=100; }
    }

    @Override public void cancel() {}
}


On 5/11/2021 6:42 PM, Slotterback, Chris wrote:

Hi Chesnay,

That doesn’t compile, as WindowedStream doesn’t have the operator getSideOutput, only SingleOutputStreamOperator has that operation.

Chris

*From: *Chesnay Schepler <ches...@apache.org>
*Date: *Tuesday, May 11, 2021 at 6:09 AM
*To: *"Slotterback, Chris" <chris_slotterb...@comcast.com>, "user@flink.apache.org" <user@flink.apache.org> *Subject: *[EXTERNAL] Re: sideOutputLateData not propagating late reports once window expires

Please try this:

val windowedStream = stream
  .keyBy(…)
  .window(TumblingEventTimeWindows./of/(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(new myTrigger)

val lateStream =
  windowedStream.getSideOutput(lateTag);
val aggregatedStream = windowedStream.aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))

On 5/10/2021 9:56 PM, Slotterback, Chris wrote:

    Hey Flink Users,

    I am having some issues with getting sideOutputLateData to
    properly function with late event time reports. I have the
    following code that, per my understanding, should be allowing
    reports that fall after the window has triggered and beyond
    allowed lateness to pass through to the side output:

    val lateTag = new OutputTag[…]("tag"){}

    val windowedStream = stream
      .keyBy(…)
      .window(TumblingEventTimeWindows./of/(…))
      .allowedLateness(…)
      .sideOutputLateData(lateTag)
      .trigger(new myTrigger)
      .aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))

    val lateStream =

      windowedStream.getSideOutput(lateTag);

    trigger:

    public class myTrigger extends Trigger<…>, Window> {

    @Override

    public TriggerResult onElement(…) throws Exception {

    return TriggerResult./CONTINUE/;

    }

    @Override

    public TriggerResult onProcessingTime(…) throws Exception {

    throw new Exception("processing time not supported");

    }

    @Override

    public TriggerResult onEventTime(…) throws Exception {

    return TriggerResult./FIRE_AND_PURGE/;

    }

    The main flow is functional when all reports are ontime, but when
    I start introducing late reports, they get rejected by the window
    but are failing to end up in the late stream. Is there something
    off with my understanding?

    Chris


Reply via email to