Hi, I have been trying to understand how triggers work in Flink. We have a set of data that arrives to us on Kafka. We need to process the data in a window when either one of the two criteria satisfy: 1) Max number of elements has reached in the window. 2) Some max time has elapsed (Say 5 milliseconds in our case).
I have written the following code: public class WindowTest { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStreamSource<Long> source = env.addSource(new SourceFunction<Long>() { @Override public void run(SourceContext<Long> ctx) throws Exception { LongStream.range(0, 102).forEach(ctx::collect); } @Override public void cancel() { } }); source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(15))).apply(new AllWindowFunction<Long, Object, TimeWindow>() { @Override public void apply(TimeWindow timeWindow, Iterable<Long> values, Collector<Object> collector) throws Exception { System.out.println("processing a window"); System.out.println(Joiner.on(',').join(values)); } }).print(); env.execute("test-program"); } } Here is the output I get when I run this code: processing a window 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14 processing a window 15,16,17,18,19,20,21,22,23,24,25,26,27,28,29 processing a window 30,31,32,33,34,35,36,37,38,39,40,41,42,43,44 processing a window 45,46,47,48,49,50,51,52,53,54,55,56,57,58,59 processing a window 60,61,62,63,64,65,66,67,68,69,70,71,72,73,74 processing a window 75,76,77,78,79,80,81,82,83,84,85,86,87,88,89 As you can see, the data from 90 to 101 is not processed. Shouldn't it be processed when the window is completed after 5 ms? When I remove the trigger part, all of the data does get processed from 0 to 101. Any idea why do we see such a behaviour here? -- *Regards,Harshvardhan Agrawal* *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*