Ah, sorry for the compile issue.
I wasn't able to reproduce the issue; conceptually your code looks fine.
Can you provide us with a self-contained reproducer for the issue?
For reference, here's the test I used, that you can maybe adjust as
necessary to replicate your use-case:
@Test public void testWindowLateDataSideOutput()throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final
OutputTag<Integer> lateTag =new OutputTag<>("tag") {}; final
SingleOutputStreamOperator<Integer> windowedStream =
env.addSource(new MySource())
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
.keyBy(x -> x)
.window(TumblingEventTimeWindows.of(Time.of(1,
TimeUnit.SECONDS)))
.allowedLateness(Time.of(1, TimeUnit.SECONDS))
.sideOutputLateData(lateTag)
.trigger(new MyTrigger())
.reduce(Integer::sum); final DataStream<Integer> lateStream =
windowedStream.getSideOutput(lateTag); lateStream.map(x ->"late: " + x).print();
env.execute(); }
private static class MySourceimplements SourceFunction<Integer> {@Override public
void run(SourceContext<Integer> ctx)throws Exception {
final long lateTime =1000; long currentTime =5000; while (true) {
ctx.collectWithTimestamp(1, lateTime); ctx.collectWithTimestamp(2,
currentTime); currentTime +=100; }
}
@Override public void cancel() {}
}
On 5/11/2021 6:42 PM, Slotterback, Chris wrote:
Hi Chesnay,
That doesn’t compile, as WindowedStream doesn’t have the operator
getSideOutput, only SingleOutputStreamOperator has that operation.
Chris
*From: *Chesnay Schepler <ches...@apache.org>
*Date: *Tuesday, May 11, 2021 at 6:09 AM
*To: *"Slotterback, Chris" <chris_slotterb...@comcast.com>,
"user@flink.apache.org" <user@flink.apache.org>
*Subject: *[EXTERNAL] Re: sideOutputLateData not propagating late
reports once window expires
Please try this:
val windowedStream = stream
.keyBy(…)
.window(TumblingEventTimeWindows./of/(…))
.allowedLateness(…)
.sideOutputLateData(lateTag)
.trigger(new myTrigger)
val lateStream =
windowedStream.getSideOutput(lateTag);
val aggregatedStream = windowedStream.aggregate(new
VSGBondingGroupMediansAggregator(packetFilterCount))
On 5/10/2021 9:56 PM, Slotterback, Chris wrote:
Hey Flink Users,
I am having some issues with getting sideOutputLateData to
properly function with late event time reports. I have the
following code that, per my understanding, should be allowing
reports that fall after the window has triggered and beyond
allowed lateness to pass through to the side output:
val lateTag = new OutputTag[…]("tag"){}
val windowedStream = stream
.keyBy(…)
.window(TumblingEventTimeWindows./of/(…))
.allowedLateness(…)
.sideOutputLateData(lateTag)
.trigger(new myTrigger)
.aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))
val lateStream =
windowedStream.getSideOutput(lateTag);
trigger:
public class myTrigger extends Trigger<…>, Window> {
@Override
public TriggerResult onElement(…) throws Exception {
return TriggerResult./CONTINUE/;
}
@Override
public TriggerResult onProcessingTime(…) throws Exception {
throw new Exception("processing time not supported");
}
@Override
public TriggerResult onEventTime(…) throws Exception {
return TriggerResult./FIRE_AND_PURGE/;
}
The main flow is functional when all reports are ontime, but when
I start introducing late reports, they get rejected by the window
but are failing to end up in the late stream. Is there something
off with my understanding?
Chris