Hi Steve, afaik there is no such thing in Flink. I agree that Flink's testing utilities should be improved. If you implement such a source, then you might be able to contribute it back to the community. That would be super helpful.
Cheers, Till On Wed, May 8, 2019 at 6:40 PM Steven Nelson <snel...@sourceallies.com> wrote: > > That’s what I figured was happening :( Your explanation is a lot better > than what I gave to my team, so that will help a lot, thank you! > > Is there a testing source already created that does this sort of thing? > The Flink-testing library seems a bit sparse. > > -Steve > > Sent from my iPhone > > On May 8, 2019, at 9:33 AM, Till Rohrmann <trohrm...@apache.org> wrote: > > Hi Steve, > > I think the reason for the different behaviour is due to the way event > time and processing time are implemented. > > When you are using event time, watermarks need to travel through the > topology denoting the current event time. When you source terminates, the > system will send a watermark with Long.MAX_VALUE through the topology. This > will effectively trigger the completion of all pending event time > operations. > > In the case of processing time, Flink does not do this. Instead it simply > relies on the processing time clocks on each machine. Hence, there is no > way for Flink to tell the different machines that their respective > processing time clocks should proceed to a certain time in case of a > shutdown. Instead you should make sure that you don't terminate the job > before a certain time (processing time) has passed. You could do this by > adding a sleep to your source function after you've output all records and > just before leaving the source loop. > > Cheers, > Till > > On Tue, May 7, 2019 at 11:49 PM Steven Nelson <snel...@sourceallies.com> > wrote: > >> Hello! >> >> I am trying to write a test that runs in the TestEnviroment. I create a >> process that uses ProcessingTime, has a source constructed from a >> FromElementsFunction and runs data through a Keyed Stream into >> a ProcessingTimeSessionWindows.withGap(). >> >> The problem is that it appears that the env.execute method returns >> immediately after the session closes, not allowing the events to be >> released from the window before shutdown occurs. This used to work when I >> used EventTime. >> >> Thoughts? >> -Steve >> >