Hi Harshvardhan,

By specifying a trigger using trigger() you are overwriting the default
trigger of a WindowAssigner. For example, if you specify a CountTrigger for
TumblingEventTimeWindows you will no longer get window firings based on the
progress of time but only by count. Right now, you have to write your own
custom trigger if you want to react based on both time and count.
More details here[1].

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#default-triggers-of-windowassigners

On Sun, Jul 22, 2018 at 11:59 PM, Harshvardhan Agrawal <
harshvardhan.ag...@gmail.com> wrote:

> Hi,
>
> I have been trying to understand how triggers work in Flink. We have a set
> of data that arrives to us on Kafka. We need to process the data in a
> window when either one of the two criteria satisfy:
> 1) Max number of elements has reached in the window.
> 2) Some max time has elapsed (Say 5 milliseconds in our case).
>
> I have written the following code:
>
> public class WindowTest {
>     public static void main (String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.
> createLocalEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>         DataStreamSource<Long> source = env.addSource(new
> SourceFunction<Long>() {
>
>             @Override
>             public void run(SourceContext<Long> ctx) throws Exception {
>                 LongStream.range(0, 102).forEach(ctx::collect);
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         });
>
>         source.timeWindowAll(Time.milliseconds(5)).trigger(
> PurgingTrigger.of(CountTrigger.of(15))).apply(new AllWindowFunction<Long,
> Object, TimeWindow>() {
>             @Override
>             public void apply(TimeWindow timeWindow, Iterable<Long>
> values, Collector<Object> collector) throws Exception {
>                 System.out.println("processing a window");
>                 System.out.println(Joiner.on(',').join(values));
>             }
>         }).print();
>
>         env.execute("test-program");
>
>     }
> }
>
> Here is the output I get when I run this code:
>
> processing a window
> 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
> processing a window
> 15,16,17,18,19,20,21,22,23,24,25,26,27,28,29
> processing a window
> 30,31,32,33,34,35,36,37,38,39,40,41,42,43,44
> processing a window
> 45,46,47,48,49,50,51,52,53,54,55,56,57,58,59
> processing a window
> 60,61,62,63,64,65,66,67,68,69,70,71,72,73,74
> processing a window
> 75,76,77,78,79,80,81,82,83,84,85,86,87,88,89
>
> As you can see, the data from 90 to 101 is not processed. Shouldn't it be
> processed when the window is completed after 5 ms?
>
> When I remove the trigger part, all of the data does get processed from 0
> to 101.
>
> Any idea why do we see such a behaviour here?
> --
>
> *Regards,Harshvardhan Agrawal*
> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>

Reply via email to