HI,

Accessing file is not the problem.
If i put the file before starting the job then this job reads it
correctly but with if i add any file at runtime then it does not read this
newly added files.
Let me know if you need more information.

Thanks & Regards,
Samir Vasani



On Mon, Aug 30, 2021 at 8:03 PM Roman Khachatryan <ro...@apache.org> wrote:

> Hi,
>
> If I understand correctly, the problem is accessing local files from
> Flink running in docker.
> Have you tried mounting the local directory into the container, for
> example as a bind mount [1]?
>
> [1]
> https://docs.docker.com/storage/bind-mounts/
>
> Regards,
> Roman
>
> On Mon, Aug 30, 2021 at 3:33 PM Samir Vasani <samirvas...@gmail.com>
> wrote:
> >
> > I have a requirement to read a file continously from a specific path.
> >
> > Means flink job should continously poll the specified location and read
> a file will arrive at this location at certains intervals .
> >
> > Example: my location on windows machine is C:/inputfiles get a file
> file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM.
> >
> > To experimented this with below code .
> >
> > import org.apache.flink.api.common.functions.FlatMapFunction;
> > import org.apache.flink.api.common.io.FilePathFilter;
> > import org.apache.flink.api.java.io.TextInputFormat;
> > import org.apache.flink.core.fs.FileSystem;
> > import org.apache.flink.streaming.api.datastream.DataStream;
> > import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> > import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import
> org.apache.flink.streaming.api.functions.source.FileProcessingMode;
> > import org.apache.flink.util.Collector;
> >
> > import java.util.Arrays;
> > import java.util.List;
> >
> > public class ContinuousFileProcessingTest {
> >
> > public static void main(String[] args) throws Exception {
> >
> >     final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >     env.enableCheckpointing(10);
> >     String localFsURI = "D:\\FLink\\2021_01_01\\";
> >     TextInputFormat format = new TextInputFormat(new
> org.apache.flink.core.fs.Path(localFsURI));
> >     format.setFilesFilter(FilePathFilter.createDefaultFilter());
> >     DataStream<String> inputStream =
> >             env.readFile(format, localFsURI,
> FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
> >     SingleOutputStreamOperator<String> soso =
> inputStream.map(String::toUpperCase);
> >     soso.print();
> >     soso.writeAsText("D:\\FLink\\completed",
> FileSystem.WriteMode.OVERWRITE);
> >     env.execute("read and write");
> > }
> > }
> >
> >
> >
> >
> > I brought up flink cluster using flink's 1.9.2 and i was able to achieve
> my goal of readin file continously at some intervals.
> >
> > Flink's 1.9.2 version can bring up cluster on windows.
> >
> > But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And we
> used docker to bring cluster up on 1.12 (unlike 1.9.2).
> >
> > Unlike windows path i changed the file location as per docker location
> but the same above program in not running there.
> >
> > Need help to find the solution.
> >
> > Thanks in advance.
> >
> >
> > Thanks & Regards,
> > Samir Vasani
> >
>

Reply via email to