I'm a bit confused about this too actually. I think the above would work as
a solution if you want to continuously monitor a directory, but for a
"PROCESS_ONCE" readFile source I don't think you will get a checkpoint
emitted indicating the end of the stream.

My understanding of this is that there can be no checkpoints created
while the file directory

Trying to dig into the java code I found this:

case PROCESS_ONCE:
   synchronized (checkpointLock) {

      // the following check guarantees that if we restart
      // after a failure and we managed to have a successful
      // checkpoint, we will not reprocess the directory.

      if (globalModificationTime == Long.MIN_VALUE) {
         monitorDirAndForwardSplits(fileSystem, context);
         globalModificationTime = Long.MAX_VALUE;
      }
      isRunning = false;
   }
   break;

My understanding of this is that there can be no checkpoints created
while the file directory is read, and then once it is read the
isRunning flat is set to false, which means no new checkpoints are
emitted.

Is this correct? If so, is it possible to somehow force a checkpoint
to be emitted on the completion of the source?



On Tue, May 22, 2018 at 3:24 AM Amit Jain <aj201...@gmail.com> wrote:

> Hi Alex,
>
> StreamingExecutionEnvironment#readFile is a helper function to create
> file reader data streaming source. It uses
> ContinuousFileReaderOperator and ContinuousFileMonitoringFunction
> internally.
>
> As both file reader operator and monitoring function uses
> checkpointing so is readFile [1], you can go with first approach.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readFile-org.apache.flink.api.common.io.FileInputFormat-java.lang.String-org.apache.flink.streaming.api.functions.source.FileProcessingMode-long-org.apache.flink.api.common.typeinfo.TypeInformation-
>
>
> --
> Thanks,
> Amit
>
>
> On Mon, May 21, 2018 at 9:39 PM, NEKRASSOV, ALEXEI <an4...@att.com> wrote:
> > I want to add checkpointing to my program that reads from a set of files
> in
> > a directory. Without checkpointing I use readFile():
> >
> >
> >
> >               DataStream<String> text = env.readFile(
> >
> >                            new TextInputFormat(new Path(inputPath)),
> >
> >                            inputPath,
> >
> >                           inputProcessingMode,
> >
> >                           1000);
> >
> >
> >
> > Should I use ContinuousFileMonitoringFunction /
> ContinuousFileReaderOperator
> > to add checkpointing? Or is there an easier way?
> >
> >
> >
> > How do I go from splits (that ContinuousFileMonitoringFunction provides)
> to
> > actual strings? I’m not clear how ContinuousFileReaderOperator can be
> used.
> >
> >
> >
> >               DataStreamSource<TimestampedFileInputSplit> split =
> > env.addSource(
> >
> >                            new ContinuousFileMonitoringFunction<String>(
> >
> >                                          new TextInputFormat(new
> > Path(inputPath)),
> >
> >                                          inputProcessingMode,
> >
> >                                          1,
> >
> >                                          1000)
> >
> >               );
> >
> >
> >
> > Thanks,
> > Alex
>

Reply via email to