Chesnay, Thanks for the help, I used your example as a baseline for mine and got it working. For anyone who may see this in the future, I actually had an assignTimestampsAndWatermarks (along with a name and uid operator) attached to the end of the stream I was calling getSideOutput on. It was my false assumption that those 3 operators had no effect on accessing side outputs, as the watermark assigner seems to modify the chain and prevents accessing the late side outputs after it is appended. I moved the watermark further down the chain after accessing the side output and it is working as expected now.
Thanks for the help! Chris From: Chesnay Schepler <ches...@apache.org> Date: Wednesday, May 12, 2021 at 5:24 AM To: "Slotterback, Chris" <chris_slotterb...@comcast.com>, "user@flink.apache.org" <user@flink.apache.org> Subject: Re: [EXTERNAL] Re: sideOutputLateData not propagating late reports once window expires Ah, sorry for the compile issue. I wasn't able to reproduce the issue; conceptually your code looks fine. Can you provide us with a self-contained reproducer for the issue? For reference, here's the test I used, that you can maybe adjust as necessary to replicate your use-case: @Test public void testWindowLateDataSideOutput() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final OutputTag<Integer> lateTag = new OutputTag<>("tag") {}; final SingleOutputStreamOperator<Integer> windowedStream = env.addSource(new MySource()) .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()) .keyBy(x -> x) .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .allowedLateness(Time.of(1, TimeUnit.SECONDS)) .sideOutputLateData(lateTag) .trigger(new MyTrigger()) .reduce(Integer::sum); final DataStream<Integer> lateStream = windowedStream.getSideOutput(lateTag); lateStream.map(x -> "late: " + x).print(); env.execute(); } private static class MySource implements SourceFunction<Integer> { @Override public void run(SourceContext<Integer> ctx) throws Exception { final long lateTime = 1000; long currentTime = 5000; while (true) { ctx.collectWithTimestamp(1, lateTime); ctx.collectWithTimestamp(2, currentTime); currentTime += 100; } } @Override public void cancel() {} } On 5/11/2021 6:42 PM, Slotterback, Chris wrote: Hi Chesnay, That doesn’t compile, as WindowedStream doesn’t have the operator getSideOutput, only SingleOutputStreamOperator has that operation. Chris From: Chesnay Schepler <ches...@apache.org><mailto:ches...@apache.org> Date: Tuesday, May 11, 2021 at 6:09 AM To: "Slotterback, Chris" <chris_slotterb...@comcast.com><mailto:chris_slotterb...@comcast.com>, "user@flink.apache.org"<mailto:user@flink.apache.org> <user@flink.apache.org><mailto:user@flink.apache.org> Subject: [EXTERNAL] Re: sideOutputLateData not propagating late reports once window expires Please try this: val windowedStream = stream .keyBy(…) .window(TumblingEventTimeWindows.of(…)) .allowedLateness(…) .sideOutputLateData(lateTag) .trigger(new myTrigger) val lateStream = windowedStream.getSideOutput(lateTag); val aggregatedStream = windowedStream.aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount)) On 5/10/2021 9:56 PM, Slotterback, Chris wrote: Hey Flink Users, I am having some issues with getting sideOutputLateData to properly function with late event time reports. I have the following code that, per my understanding, should be allowing reports that fall after the window has triggered and beyond allowed lateness to pass through to the side output: val lateTag = new OutputTag[…]("tag"){} val windowedStream = stream .keyBy(…) .window(TumblingEventTimeWindows.of(…)) .allowedLateness(…) .sideOutputLateData(lateTag) .trigger(new myTrigger) .aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount)) val lateStream = windowedStream.getSideOutput(lateTag); trigger: public class myTrigger extends Trigger<…>, Window> { @Override public TriggerResult onElement(…) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(…) throws Exception { throw new Exception("processing time not supported"); } @Override public TriggerResult onEventTime(…) throws Exception { return TriggerResult.FIRE_AND_PURGE; } The main flow is functional when all reports are ontime, but when I start introducing late reports, they get rejected by the window but are failing to end up in the late stream. Is there something off with my understanding? Chris