Chesnay,

Thanks for the help, I used your example as a baseline for mine and got it 
working. For anyone who may see this in the future, I actually had an 
assignTimestampsAndWatermarks (along with a name and uid operator) attached to 
the end of the stream I was calling getSideOutput on. It was my false 
assumption that those 3 operators had no effect on accessing side outputs, as 
the watermark assigner seems to modify the chain and prevents accessing the 
late side outputs after it is appended. I moved the watermark further down the 
chain after accessing the side output and it is working as expected now.

Thanks for the help!
Chris

From: Chesnay Schepler <ches...@apache.org>
Date: Wednesday, May 12, 2021 at 5:24 AM
To: "Slotterback, Chris" <chris_slotterb...@comcast.com>, 
"user@flink.apache.org" <user@flink.apache.org>
Subject: Re: [EXTERNAL] Re: sideOutputLateData not propagating late reports 
once window expires

Ah, sorry for the compile issue.

I wasn't able to reproduce the issue; conceptually your code looks fine.

Can you provide us with a self-contained reproducer for the issue?

For reference, here's the test I used, that you can maybe adjust as necessary 
to replicate your use-case:


@Test

public void testWindowLateDataSideOutput() throws Exception {

    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();



    final OutputTag<Integer> lateTag = new OutputTag<>("tag") {};



    final SingleOutputStreamOperator<Integer> windowedStream =

            env.addSource(new MySource())

                    
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())

                    .keyBy(x -> x)

                    .window(TumblingEventTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))

                    .allowedLateness(Time.of(1, TimeUnit.SECONDS))

                    .sideOutputLateData(lateTag)

                    .trigger(new MyTrigger())

                    .reduce(Integer::sum);



    final DataStream<Integer> lateStream = 
windowedStream.getSideOutput(lateTag);

    lateStream.map(x -> "late: " + x).print();



    env.execute();

}



private static class MySource implements SourceFunction<Integer> {

    @Override

    public void run(SourceContext<Integer> ctx) throws Exception {

        final long lateTime = 1000;

        long currentTime = 5000;



        while (true) {

            ctx.collectWithTimestamp(1, lateTime);

            ctx.collectWithTimestamp(2, currentTime);

            currentTime += 100;

        }

    }



    @Override

    public void cancel() {}

}

On 5/11/2021 6:42 PM, Slotterback, Chris wrote:
Hi Chesnay,

That doesn’t compile, as WindowedStream doesn’t have the operator 
getSideOutput, only SingleOutputStreamOperator has that operation.

Chris

From: Chesnay Schepler <ches...@apache.org><mailto:ches...@apache.org>
Date: Tuesday, May 11, 2021 at 6:09 AM
To: "Slotterback, Chris" 
<chris_slotterb...@comcast.com><mailto:chris_slotterb...@comcast.com>, 
"user@flink.apache.org"<mailto:user@flink.apache.org> 
<user@flink.apache.org><mailto:user@flink.apache.org>
Subject: [EXTERNAL] Re: sideOutputLateData not propagating late reports once 
window expires

Please try this:

val windowedStream = stream
  .keyBy(…)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(new myTrigger)


val lateStream =

  windowedStream.getSideOutput(lateTag);



val aggregatedStream = windowedStream.aggregate(new 
VSGBondingGroupMediansAggregator(packetFilterCount))

On 5/10/2021 9:56 PM, Slotterback, Chris wrote:
Hey Flink Users,

I am having some issues with getting sideOutputLateData to properly function 
with late event time reports. I have the following code that, per my 
understanding, should be allowing reports that fall after the window has 
triggered and beyond allowed lateness to pass through to the side output:


val lateTag = new OutputTag[…]("tag"){}

val windowedStream = stream
  .keyBy(…)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(new myTrigger)
  .aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))


val lateStream =

  windowedStream.getSideOutput(lateTag);


trigger:

public class myTrigger extends Trigger<…>, Window> {

    @Override

    public TriggerResult onElement(…) throws Exception {

        return TriggerResult.CONTINUE;

    }



    @Override

    public TriggerResult onProcessingTime(…) throws Exception {

        throw new Exception("processing time not supported");

    }



    @Override

    public TriggerResult onEventTime(…) throws Exception {

        return TriggerResult.FIRE_AND_PURGE;

    }

The main flow is functional when all reports are ontime, but when I start 
introducing late reports, they get rejected by the window but are failing to 
end up in the late stream. Is there something off with my understanding?

Chris




Reply via email to