Hi,

I have written an Apache Flink Pipeline containing the following piece of code (Java):

stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(50))).aggregate(new CustomAggregator()).print();

If I run the pipeline using local execution I see the following behavior. The "CustomAggregator" calls the `createAccumulator` and `add` methods correctly with the correct data. However it never calls `getResult` and my pipeline simply finishes.

So I did a little research and found out that it works if I change the code to:

stream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1))).aggregate(new CustomAggregator()).print();

Notice the reduced gap time for the processing time session window. So it seems that execution only continues if the window has been closed and if that takes too long, the execution simply aborts. I guess another factor playing a part in the problem is, that my initial data is read in much faster than 50 seconds. This results in the pipeline being in a state where it only waits for the window to be closed and having nothing else to do it decides that there is no work left and simply shuts down.

My question now is if it is possible to tell the local execution environment to wait for that window to be closed, instead of just shutting down.

Thanks and Regards

    Klemens Muthmann

Reply via email to