I understand. Thanks for looking into it Senthil! Kostas
On Tue, Jun 9, 2020 at 7:32 PM Senthil Kumar <senthi...@vmware.com> wrote: > > OK, will do and report back. > > We are on 1.9.1, > > 1.10 will take some time __ > > On 6/9/20, 2:06 AM, "Kostas Kloudas" <kklou...@gmail.com> wrote: > > Hi Senthil, > > From a quick look at the code, it seems that the cancel() of your > source function should be called, and the thread that it is running on > should be interrupted. > > In order to pin down the problem and help us see if this is an actual > bug, could you please: > 1) enable debug logging and see if you can spot some lines like this: > > "Starting checkpoint (XXXX-ID) SYNC_SAVEPOINT on task XXXXX" or sth > similar with synchronous savepoint in it > > and any other message afterwards with XXXX-ID in it to see if the > savepoint is completed successfully. > > 2) could you see if this behavior persists in the FLINK-1.10? > > Thanks, > Kostas > > On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar <senthi...@vmware.com> wrote: > > > > Robert, > > > > > > > > Thank you once again! We are currently doing the “short” Thread.sleep() > approach. Seems to be working fine. > > > > > > > > Cheers > > > > Kumar > > > > > > > > From: Robert Metzger <rmetz...@apache.org> > > Date: Tuesday, June 2, 2020 at 2:40 AM > > To: Senthil Kumar <senthi...@vmware.com> > > Cc: "user@flink.apache.org" <user@flink.apache.org> > > Subject: Re: Age old stop vs cancel debate > > > > > > > > Hi Kumar, > > > > > > this is more a Java question than a Flink question now :) If it is > easily possible from your code, then I would regularly check the isRunning > flag (by having short Thread.sleeps()) to have a proper cancellation behavior. > > > > If this makes your code very complicated, then you could work with > manually interrupting your worker thread. I would only use this method if you > are sure your code (and the libraries you are using) are properly handling > interrupts. > > > > Sorry that I can not give you a more actionable response. It depends a > lot on the structure of your code and the libraries you are calling into. > > > > > > > > Best, > > > > Robert > > > > > > > > > > > > On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <senthi...@vmware.com> > wrote: > > > > Hi Robert, > > > > > > > > Would appreciate more insights please. > > > > > > > > What we are noticing: When the flink job is issued a stop command, the > Thread.sleep is not receiving the InterruptedException > > > > > > > > It certainly receives the exception when the flink job is issued a > cancel command. > > > > > > > > In both cases (cancel and stop) the cancel() method is getting called > (to set the isRunning variable to false) > > > > > > > > However, given that the thread does not get interrupted in stop, we are > not in a position to check the isRunning variable. > > > > > > > > > > > > For now, we are doing a Thread.sleep every 5 minutes (instead of the > normal interval which is in hours). > > > > Sleeping for 5 minutes gives us a chance to check the isRunning > variable. > > > > > > > > Another approach would be to save the currentThread > (Thread.currentThread()) before doing a Thread.sleep()) > > > > and manually calling Thread.interrupt() from the cancel function. > > > > > > > > What is your recommendation? > > > > > > > > Cheers > > > > Kumar > > > > > > > > > > > > From: Robert Metzger <rmetz...@apache.org> > > Date: Friday, May 29, 2020 at 4:38 AM > > To: Senthil Kumar <senthi...@vmware.com> > > Cc: "user@flink.apache.org" <user@flink.apache.org> > > Subject: Re: Age old stop vs cancel debate > > > > > > > > Hi Kumar, > > > > > > > > They way you've implemented your custom source sounds like the right > way: Having a "running" flag checked by the run() method and changing it in > cancel(). > > > > Also, it is good that you are properly handling the interrupt set by > Flink (some people ignore InterruptedExceptions, which make it difficult > (basically impossible) for Flink to stop the job) > > > > > > > > Best, > > > > Robert > > > > > > > > > > > > On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <senthi...@vmware.com> > wrote: > > > > We are on flink 1.9.0 > > > > > > > > I have a custom SourceFunction, where I rely on isRunning set to false > via the cancel() function to exit out of the run loop. > > > > My run loop essentially gets the data from S3, and then simply sleeps > (Thread.sleep) for a specified time interval. > > > > > > > > When a job gets cancelled, the SourceFunction.cancel() is called, which > sets the isRunning to false. > > > > In addition, the Thread.sleep gets interrupted, a check Is made on the > isRunning variable (set to false now) and the run loop is exited. > > > > > > > > We noticed that when we “stop” the flink job, the Thread.sleep does not > get interrupted. > > > > It also appears that SoruceFunction.cancel() is not getting called > (which seems like the correct behavior for “stop”) > > > > > > > > My question: what’s the “right” way to exit the run() loop when the > flink job receives a stop command? > > > > > > > > My understanding was that there was a Stoppable interface (which got > removed in 1.9.0) > > > > > > > > Would appreciate any insights. > > > > > > > > Cheers > > > > Kumar > >