Hi Steve,

afaik there is no such thing in Flink. I agree that Flink's testing
utilities should be improved. If you implement such a source, then you
might be able to contribute it back to the community. That would be super
helpful.

Cheers,
Till

On Wed, May 8, 2019 at 6:40 PM Steven Nelson <snel...@sourceallies.com>
wrote:

>
> That’s what I figured was happening :( Your explanation is a lot better
> than what I gave to my team, so that will help a lot, thank you!
>
> Is there a testing source already created that does this sort of thing?
> The Flink-testing library seems a bit sparse.
>
> -Steve
>
> Sent from my iPhone
>
> On May 8, 2019, at 9:33 AM, Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Steve,
>
> I think the reason for the different behaviour is due to the way event
> time and processing time are implemented.
>
> When you are using event time, watermarks need to travel through the
> topology denoting the current event time. When you source terminates, the
> system will send a watermark with Long.MAX_VALUE through the topology. This
> will effectively trigger the completion of all pending event time
> operations.
>
> In the case of processing time, Flink does not do this. Instead it simply
> relies on the processing time clocks on each machine. Hence, there is no
> way for Flink to tell the different machines that their respective
> processing time clocks should proceed to a certain time in case of a
> shutdown. Instead you should make sure that you don't terminate the job
> before a certain time (processing time) has passed. You could do this by
> adding a sleep to your source function after you've output all records and
> just before leaving the source loop.
>
> Cheers,
> Till
>
> On Tue, May 7, 2019 at 11:49 PM Steven Nelson <snel...@sourceallies.com>
> wrote:
>
>> Hello!
>>
>> I am trying to write a test that runs in the TestEnviroment. I create a
>> process that uses ProcessingTime, has a source constructed from a
>> FromElementsFunction and runs data through a Keyed Stream into
>> a ProcessingTimeSessionWindows.withGap().
>>
>> The problem is that it appears that the env.execute method returns
>> immediately after the session closes, not allowing the events to be
>> released from the window before shutdown occurs. This used to work when I
>> used EventTime.
>>
>> Thoughts?
>> -Steve
>>
>

Reply via email to