Ah, this works as expected, since Flink documentation states this: By specifying a trigger using trigger() you are overwriting the default > trigger of a WindowAssigner. For example, if you specify a CountTrigger for > TumblingEventTimeWindows you will no longer get window firings based on the > progress of time but only by count. Right now, you have to write your own > custom trigger if you want to react based on both time and count. >
On Tue, Jan 26, 2021 at 10:44 AM Marco Villalobos <mvillalo...@kineteque.com> wrote: > I wrote this simple test: > > .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) > .trigger(PurgingTrigger.of(CountTrigger.of(5))) > > Thinking that if I send 2 elements of data, it would collect them after a > minute. > But that doesn't seem to be happening. > > Is my understanding of how windows and triggers work correct? > > /** > * To test this job first in command line make a simple server on a terminal > * > * nc -l 8889 > * > * Then start this job at the command line or in IDE. Then in the terminal > input each value by typing a line of text. > * Each line of text (excluding the new line) will be picked up by this job. > */ > public class TriggerTestJob { > > public static void main(String args[]) throws Exception { > final StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > > streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > > streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); > > streamEnv.socketTextStream("localhost", 8889) > .map(value -> new Tuple2<String, String>("test", > value)).returns(new TypeHint<Tuple2<String, String>>(){}) > .keyBy((KeySelector<Tuple2<String, String>, String>) value -> > value.f0) > .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) > .trigger(PurgingTrigger.of(CountTrigger.of(5))) > .process(new ProcessWindowFunction<Tuple2<String, String>, > Tuple2<String, String>, String, TimeWindow>() { > @Override > public void process(String key, Context context, > Iterable<Tuple2<String, String>> elements, Collector<Tuple2<String, String>> > out) throws Exception { > for (Tuple2<String, String> element : elements) { > out.collect(element); > } > } > }).name("trigger") > .print(); > > streamEnv.execute("trigger test"); > } > } > >