Ah, this works as expected, since Flink documentation states this:

By specifying a trigger using trigger() you are overwriting the default
> trigger of a WindowAssigner. For example, if you specify a CountTrigger for
> TumblingEventTimeWindows you will no longer get window firings based on the
> progress of time but only by count. Right now, you have to write your own
> custom trigger if you want to react based on both time and count.
>



On Tue, Jan 26, 2021 at 10:44 AM Marco Villalobos <mvillalo...@kineteque.com>
wrote:

> I wrote this simple test:
>
> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
> .trigger(PurgingTrigger.of(CountTrigger.of(5)))
>
> Thinking that if I send 2 elements of data, it would collect them after a
> minute.
> But that doesn't seem to be happening.
>
> Is my understanding of how windows and triggers work correct?
>
> /**
>  * To test this job first in command line make a simple server on a terminal
>  *
>  * nc -l 8889
>  *
>  * Then start this job at the command line or in IDE. Then in the terminal 
> input each value by typing a line of text.
>  * Each line of text (excluding the new line) will be picked up by this job.
>  */
> public class TriggerTestJob {
>
>    public static void main(String args[]) throws Exception {
>       final StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>       
> streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>       streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>       
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
>
>       streamEnv.socketTextStream("localhost", 8889)
>             .map(value -> new Tuple2<String, String>("test", 
> value)).returns(new TypeHint<Tuple2<String, String>>(){})
>             .keyBy((KeySelector<Tuple2<String, String>, String>) value -> 
> value.f0)
>             .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>             .trigger(PurgingTrigger.of(CountTrigger.of(5)))
>             .process(new ProcessWindowFunction<Tuple2<String, String>, 
> Tuple2<String, String>, String, TimeWindow>() {
>                @Override
>                public void process(String key, Context context, 
> Iterable<Tuple2<String, String>> elements, Collector<Tuple2<String, String>> 
> out) throws Exception {
>                   for (Tuple2<String, String> element : elements) {
>                      out.collect(element);
>                   }
>                }
>             }).name("trigger")
>             .print();
>
>       streamEnv.execute("trigger test");
>    }
> }
>
>

Reply via email to