HI, Accessing file is not the problem. If i put the file before starting the job then this job reads it correctly but with if i add any file at runtime then it does not read this newly added files. Let me know if you need more information.
Thanks & Regards, Samir Vasani On Mon, Aug 30, 2021 at 8:03 PM Roman Khachatryan <ro...@apache.org> wrote: > Hi, > > If I understand correctly, the problem is accessing local files from > Flink running in docker. > Have you tried mounting the local directory into the container, for > example as a bind mount [1]? > > [1] > https://docs.docker.com/storage/bind-mounts/ > > Regards, > Roman > > On Mon, Aug 30, 2021 at 3:33 PM Samir Vasani <samirvas...@gmail.com> > wrote: > > > > I have a requirement to read a file continously from a specific path. > > > > Means flink job should continously poll the specified location and read > a file will arrive at this location at certains intervals . > > > > Example: my location on windows machine is C:/inputfiles get a file > file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM. > > > > To experimented this with below code . > > > > import org.apache.flink.api.common.functions.FlatMapFunction; > > import org.apache.flink.api.common.io.FilePathFilter; > > import org.apache.flink.api.java.io.TextInputFormat; > > import org.apache.flink.core.fs.FileSystem; > > import org.apache.flink.streaming.api.datastream.DataStream; > > import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > import > org.apache.flink.streaming.api.functions.source.FileProcessingMode; > > import org.apache.flink.util.Collector; > > > > import java.util.Arrays; > > import java.util.List; > > > > public class ContinuousFileProcessingTest { > > > > public static void main(String[] args) throws Exception { > > > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.enableCheckpointing(10); > > String localFsURI = "D:\\FLink\\2021_01_01\\"; > > TextInputFormat format = new TextInputFormat(new > org.apache.flink.core.fs.Path(localFsURI)); > > format.setFilesFilter(FilePathFilter.createDefaultFilter()); > > DataStream<String> inputStream = > > env.readFile(format, localFsURI, > FileProcessingMode.PROCESS_CONTINUOUSLY, 100); > > SingleOutputStreamOperator<String> soso = > inputStream.map(String::toUpperCase); > > soso.print(); > > soso.writeAsText("D:\\FLink\\completed", > FileSystem.WriteMode.OVERWRITE); > > env.execute("read and write"); > > } > > } > > > > > > > > > > I brought up flink cluster using flink's 1.9.2 and i was able to achieve > my goal of readin file continously at some intervals. > > > > Flink's 1.9.2 version can bring up cluster on windows. > > > > But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And we > used docker to bring cluster up on 1.12 (unlike 1.9.2). > > > > Unlike windows path i changed the file location as per docker location > but the same above program in not running there. > > > > Need help to find the solution. > > > > Thanks in advance. > > > > > > Thanks & Regards, > > Samir Vasani > > >