Hey Kumar, if you are swallowing any and all exceptions, your Flink job will not fail because of issues arising from your custom source. It might make sense to stop the source if you are catching an InterruptedException.
Throwing exceptions out of the run method basically signals the Flink framework that the source has failed, and thus the job will fail / go into recovery. The way you are using the cancel() method + isRunning variable is correct for having a proper cancellation behavior of the source. On Fri, May 8, 2020 at 3:31 AM Jingsong Li <jingsongl...@gmail.com> wrote: > Hi, > > Some suggestions from my side: > - synchronized (checkpointLock) to some work and ctx.collect? > - Put Thread.sleep(interval) out of try catch? Maybe should not > swallow interrupt exception (Like cancel the job). > > Best, > Jingsong Lee > > On Fri, May 8, 2020 at 2:52 AM Senthil Kumar <senthi...@vmware.com> wrote: > >> I am implementing a source function which periodically wakes up and >> consumes data from S3. >> >> >> >> My currently implementation is like so. >> >> Following: >> *org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction* >> >> >> >> Is it safe to simply swallow any and all exceptions in the run method and >> just rely on this.isRunning variable to quit the run() method? >> >> >> >> Cheers >> >> Kumar >> >> >> >> --- >> >> >> >> @Override >> *public void *cancel() { >> *this*.*isRunning *= *false*; // Set volatile state variable, >> initially set to true on Class >> } >> >> >> >> @Override >> *public void *run(SourceFunction.SourceContext<OUT> ctx) { >> *while *(*this*.*isRunning*) { >> *try *{ >> OUT out = /* Do some work */ >> ctx.collect(out); >> >> Thread.*sleep*(*this*.*sleepIntervalHours ** 60 * 60 * 1000); >> >> *// Hours to milli seconds *} *catch*(Throwable t) { >> >> // Simply swallow >> } >> } >> } >> >> >> > > > -- > Best, Jingsong Lee >