Hi, Some suggestions from my side: - synchronized (checkpointLock) to some work and ctx.collect? - Put Thread.sleep(interval) out of try catch? Maybe should not swallow interrupt exception (Like cancel the job).
Best, Jingsong Lee On Fri, May 8, 2020 at 2:52 AM Senthil Kumar <senthi...@vmware.com> wrote: > I am implementing a source function which periodically wakes up and > consumes data from S3. > > > > My currently implementation is like so. > > Following: > *org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction* > > > > Is it safe to simply swallow any and all exceptions in the run method and > just rely on this.isRunning variable to quit the run() method? > > > > Cheers > > Kumar > > > > --- > > > > @Override > *public void *cancel() { > *this*.*isRunning *= *false*; // Set volatile state variable, initially > set to true on Class > } > > > > @Override > *public void *run(SourceFunction.SourceContext<OUT> ctx) { > *while *(*this*.*isRunning*) { > *try *{ > OUT out = /* Do some work */ > ctx.collect(out); > > Thread.*sleep*(*this*.*sleepIntervalHours ** 60 * 60 * 1000); > *// Hours to milli seconds *} *catch*(Throwable t) { > > // Simply swallow > } > } > } > > > -- Best, Jingsong Lee