Hi Steve,

I think the reason for the different behaviour is due to the way event time
and processing time are implemented.

When you are using event time, watermarks need to travel through the
topology denoting the current event time. When you source terminates, the
system will send a watermark with Long.MAX_VALUE through the topology. This
will effectively trigger the completion of all pending event time
operations.

In the case of processing time, Flink does not do this. Instead it simply
relies on the processing time clocks on each machine. Hence, there is no
way for Flink to tell the different machines that their respective
processing time clocks should proceed to a certain time in case of a
shutdown. Instead you should make sure that you don't terminate the job
before a certain time (processing time) has passed. You could do this by
adding a sleep to your source function after you've output all records and
just before leaving the source loop.

Cheers,
Till

On Tue, May 7, 2019 at 11:49 PM Steven Nelson <snel...@sourceallies.com>
wrote:

> Hello!
>
> I am trying to write a test that runs in the TestEnviroment. I create a
> process that uses ProcessingTime, has a source constructed from a
> FromElementsFunction and runs data through a Keyed Stream into
> a ProcessingTimeSessionWindows.withGap().
>
> The problem is that it appears that the env.execute method returns
> immediately after the session closes, not allowing the events to be
> released from the window before shutdown occurs. This used to work when I
> used EventTime.
>
> Thoughts?
> -Steve
>

Reply via email to