Hi,

If I understand correctly, the problem is accessing local files from
Flink running in docker.
Have you tried mounting the local directory into the container, for
example as a bind mount [1]?

[1]
https://docs.docker.com/storage/bind-mounts/

Regards,
Roman

On Mon, Aug 30, 2021 at 3:33 PM Samir Vasani <samirvas...@gmail.com> wrote:
>
> I have a requirement to read a file continously from a specific path.
>
> Means flink job should continously poll the specified location and read a 
> file will arrive at this location at certains intervals .
>
> Example: my location on windows machine is C:/inputfiles get a file 
> file_1.txt at 2:00PM, file_2.txt at 2:30PM, file_3.txt at 3:00PM.
>
> To experimented this with below code .
>
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.io.FilePathFilter;
> import org.apache.flink.api.java.io.TextInputFormat;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
> import org.apache.flink.util.Collector;
>
> import java.util.Arrays;
> import java.util.List;
>
> public class ContinuousFileProcessingTest {
>
> public static void main(String[] args) throws Exception {
>
>     final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(10);
>     String localFsURI = "D:\\FLink\\2021_01_01\\";
>     TextInputFormat format = new TextInputFormat(new 
> org.apache.flink.core.fs.Path(localFsURI));
>     format.setFilesFilter(FilePathFilter.createDefaultFilter());
>     DataStream<String> inputStream =
>             env.readFile(format, localFsURI, 
> FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
>     SingleOutputStreamOperator<String> soso = 
> inputStream.map(String::toUpperCase);
>     soso.print();
>     soso.writeAsText("D:\\FLink\\completed", FileSystem.WriteMode.OVERWRITE);
>     env.execute("read and write");
> }
> }
>
>
>
>
> I brought up flink cluster using flink's 1.9.2 and i was able to achieve my 
> goal of readin file continously at some intervals.
>
> Flink's 1.9.2 version can bring up cluster on windows.
>
> But now i have to upgrade the flink's version from 1.9.2 to 1.12 .And we used 
> docker to bring cluster up on 1.12 (unlike 1.9.2).
>
> Unlike windows path i changed the file location as per docker location but 
> the same above program in not running there.
>
> Need help to find the solution.
>
> Thanks in advance.
>
>
> Thanks & Regards,
> Samir Vasani
>

Reply via email to