Hi, If I understand correctly, the problem is accessing local files from Flink running in docker. Have you tried mounting the local directory into the container, for example as a bind mount [1]?
[1] https://docs.docker.com/storage/bind-mounts/ Regards, Roman On Mon, Aug 30, 2021 at 3:33 PM Samir Vasani <samirvas...@gmail.com> wrote: > > I have a requirement to read a file continously from a specific path. > > Means flink job should continously poll the specified location and read a > file will arrive at this location at certains intervals . > > Example: my location on windows machine is C:/inputfiles get a file > file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM. > > To experimented this with below code . > > import org.apache.flink.api.common.functions.FlatMapFunction; > import org.apache.flink.api.common.io.FilePathFilter; > import org.apache.flink.api.java.io.TextInputFormat; > import org.apache.flink.core.fs.FileSystem; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.source.FileProcessingMode; > import org.apache.flink.util.Collector; > > import java.util.Arrays; > import java.util.List; > > public class ContinuousFileProcessingTest { > > public static void main(String[] args) throws Exception { > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(10); > String localFsURI = "D:\\FLink\\2021_01_01\\"; > TextInputFormat format = new TextInputFormat(new > org.apache.flink.core.fs.Path(localFsURI)); > format.setFilesFilter(FilePathFilter.createDefaultFilter()); > DataStream<String> inputStream = > env.readFile(format, localFsURI, > FileProcessingMode.PROCESS_CONTINUOUSLY, 100); > SingleOutputStreamOperator<String> soso = > inputStream.map(String::toUpperCase); > soso.print(); > soso.writeAsText("D:\\FLink\\completed", FileSystem.WriteMode.OVERWRITE); > env.execute("read and write"); > } > } > > > > > I brought up flink cluster using flink's 1.9.2 and i was able to achieve my > goal of readin file continously at some intervals. > > Flink's 1.9.2 version can bring up cluster on windows. > > But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And we used > docker to bring cluster up on 1.12 (unlike 1.9.2). > > Unlike windows path i changed the file location as per docker location but > the same above program in not running there. > > Need help to find the solution. > > Thanks in advance. > > > Thanks & Regards, > Samir Vasani >