Robert,

Thank you once again! We are currently doing the “short” Thread.sleep() 
approach. Seems to be working fine.

Cheers
Kumar

From: Robert Metzger <rmetz...@apache.org>
Date: Tuesday, June 2, 2020 at 2:40 AM
To: Senthil Kumar <senthi...@vmware.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Age old stop vs cancel debate

Hi Kumar,

this is more a Java question than a Flink question now :) If it is easily 
possible from your code, then I would regularly check the isRunning flag (by 
having short Thread.sleeps()) to have a proper cancellation behavior.
If this makes your code very complicated, then you could work with manually 
interrupting your worker thread. I would only use this method if you are sure 
your code (and the libraries you are using) are properly handling interrupts.
Sorry that I can not give you a more actionable response. It depends a lot on 
the structure of your code and the libraries you are calling into.

Best,
Robert


On Fri, May 29, 2020 at 10:48 PM Senthil Kumar 
<senthi...@vmware.com<mailto:senthi...@vmware.com>> wrote:
Hi Robert,

Would appreciate more insights please.

What we are noticing: When the flink job is issued a stop command, the 
Thread.sleep is not receiving the InterruptedException

It certainly receives the exception when the flink job is issued a cancel 
command.

In both cases (cancel and stop) the cancel() method is getting called (to set 
the isRunning variable to false)

However, given that the thread does not get interrupted in stop, we are not in 
a position to check the isRunning variable.


For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal 
interval which is in hours).
Sleeping for 5 minutes gives us a chance to check the isRunning variable.

Another approach would be to save the currentThread (Thread.currentThread()) 
before doing a Thread.sleep())
and manually calling Thread.interrupt() from the cancel function.

What is your recommendation?

Cheers
Kumar


From: Robert Metzger <rmetz...@apache.org<mailto:rmetz...@apache.org>>
Date: Friday, May 29, 2020 at 4:38 AM
To: Senthil Kumar <senthi...@vmware.com<mailto:senthi...@vmware.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Age old stop vs cancel debate

Hi Kumar,

They way you've implemented your custom source sounds like the right way: 
Having a "running" flag checked by the run() method and changing it in cancel().
Also, it is good that you are properly handling the interrupt set by Flink 
(some people ignore InterruptedExceptions, which make it difficult (basically 
impossible) for Flink to stop the job)

Best,
Robert


On Wed, May 27, 2020 at 7:38 PM Senthil Kumar 
<senthi...@vmware.com<mailto:senthi...@vmware.com>> wrote:
We are on flink 1.9.0

I have a custom SourceFunction, where I rely on isRunning set to false via the 
cancel() function to exit out of the run loop.
My run loop essentially gets the data from S3, and then simply sleeps 
(Thread.sleep) for a specified time interval.

When a job gets cancelled, the SourceFunction.cancel() is called, which sets 
the isRunning to false.
In addition, the Thread.sleep gets interrupted, a check Is made on the 
isRunning variable (set to false now) and the run loop is exited.

We noticed that when we “stop” the flink job, the Thread.sleep does not get 
interrupted.
It also appears that SoruceFunction.cancel() is not getting called (which seems 
like the correct behavior for “stop”)

My question: what’s the “right” way to exit the run() loop when the flink job 
receives a stop command?

My understanding was that there was a Stoppable interface (which got removed in 
1.9.0)

Would appreciate any insights.

Cheers
Kumar

Reply via email to