Hi Chesnay, That doesn’t compile, as WindowedStream doesn’t have the operator getSideOutput, only SingleOutputStreamOperator has that operation.
Chris From: Chesnay Schepler <ches...@apache.org> Date: Tuesday, May 11, 2021 at 6:09 AM To: "Slotterback, Chris" <chris_slotterb...@comcast.com>, "user@flink.apache.org" <user@flink.apache.org> Subject: [EXTERNAL] Re: sideOutputLateData not propagating late reports once window expires Please try this: val windowedStream = stream .keyBy(…) .window(TumblingEventTimeWindows.of(…)) .allowedLateness(…) .sideOutputLateData(lateTag) .trigger(new myTrigger) val lateStream = windowedStream.getSideOutput(lateTag); val aggregatedStream = windowedStream.aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount)) On 5/10/2021 9:56 PM, Slotterback, Chris wrote: Hey Flink Users, I am having some issues with getting sideOutputLateData to properly function with late event time reports. I have the following code that, per my understanding, should be allowing reports that fall after the window has triggered and beyond allowed lateness to pass through to the side output: val lateTag = new OutputTag[…]("tag"){} val windowedStream = stream .keyBy(…) .window(TumblingEventTimeWindows.of(…)) .allowedLateness(…) .sideOutputLateData(lateTag) .trigger(new myTrigger) .aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount)) val lateStream = windowedStream.getSideOutput(lateTag); trigger: public class myTrigger extends Trigger<…>, Window> { @Override public TriggerResult onElement(…) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(…) throws Exception { throw new Exception("processing time not supported"); } @Override public TriggerResult onEventTime(…) throws Exception { return TriggerResult.FIRE_AND_PURGE; } The main flow is functional when all reports are ontime, but when I start introducing late reports, they get rejected by the window but are failing to end up in the late stream. Is there something off with my understanding? Chris