Hi,

I have been trying to understand how triggers work in Flink. We have a set
of data that arrives to us on Kafka. We need to process the data in a
window when either one of the two criteria satisfy:
1) Max number of elements has reached in the window.
2) Some max time has elapsed (Say 5 milliseconds in our case).

I have written the following code:

public class WindowTest {
    public static void main (String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        DataStreamSource<Long> source = env.addSource(new
SourceFunction<Long>() {

            @Override
            public void run(SourceContext<Long> ctx) throws Exception {
                LongStream.range(0, 102).forEach(ctx::collect);
            }

            @Override
            public void cancel() {

            }
        });


source.timeWindowAll(Time.milliseconds(5)).trigger(PurgingTrigger.of(CountTrigger.of(15))).apply(new
AllWindowFunction<Long, Object, TimeWindow>() {
            @Override
            public void apply(TimeWindow timeWindow, Iterable<Long> values,
Collector<Object> collector) throws Exception {
                System.out.println("processing a window");
                System.out.println(Joiner.on(',').join(values));
            }
        }).print();

        env.execute("test-program");

    }
}

Here is the output I get when I run this code:

processing a window
0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
processing a window
15,16,17,18,19,20,21,22,23,24,25,26,27,28,29
processing a window
30,31,32,33,34,35,36,37,38,39,40,41,42,43,44
processing a window
45,46,47,48,49,50,51,52,53,54,55,56,57,58,59
processing a window
60,61,62,63,64,65,66,67,68,69,70,71,72,73,74
processing a window
75,76,77,78,79,80,81,82,83,84,85,86,87,88,89

As you can see, the data from 90 to 101 is not processed. Shouldn't it be
processed when the window is completed after 5 ms?

When I remove the trigger part, all of the data does get processed from 0
to 101.

Any idea why do we see such a behaviour here?
-- 

*Regards,Harshvardhan Agrawal*
*267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*

Reply via email to