It works well now with following codes: —— TextInputFormat specFileFormat = new TextInputFormat(new Path(specFile)); specFileFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); DataStream<String> specificationFileStream = env .readFile(specFileFormat, specFile, FileProcessingMode.PROCESS_CONTINUOUSLY, 100L, BasicTypeInfo.STRING_TYPE_INFO) ——
Thanks. > On 18 Jun 2019, at 3:38 PM, Yun Tang <myas...@live.com> wrote: > > Hi Sung > > How about using FileProcessingMode.PROCESS_CONTINUOUSLY [1] as watch type > when reading data from HDFS.FileProcessingMode.PROCESS_CONTINUOUSLY would > periodically monitor the source while default FileProcessingMode.PROCESS_ONCE > would only process once the data and exit. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/datastream_api.html#data-sources > > <https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/datastream_api.html#data-sources> > > Best > Yun Tang > From: Sung Gon Yi <skonmem...@mac.com> > Sent: Tuesday, June 18, 2019 14:13 > To: user@flink.apache.org > Subject: Checkpointing & File stream with > > Hello, > > I work on joining two streams, one is from Kafka and another is from a file > (small size). > Stream processing works well, but checkpointing is failed with following > message. > The file only has less than 100 lines and the pipeline related file reading > is finished with “FINISHED’ o as soon as deployed. > > After that, checkpointing is failed with following message: > —— > 2019-06-17 20:25:13,575 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Custom File Source (1/1) of job > d26afe055f249c172c1dcb3311508e83 is not in state RUNNING but FINISHED > instead. Aborting checkpoint. > —— > > Custom File Source is related following codes > —— > DataStream<String> specificationFileStream = env.readTextFile(specFile) > —— > > To perform checkpointing successfully, I write a code of custom source > function to keep working (almost sleep after reading a file). I wonder it is > correct way. > > Sincerely, > Sung Gon