Hi Rakesh, So the problem is that you want your Flink job to monitor ' /data/ingestion/ingestion-raw-product’ path for new files inside and process them when they appear, right?
Can you try env.readFile but with watchType = FileProcessingMode.PROCESS_CONTINUOUSLY? You can see an example in how env.readTextFile(String filePath) and env.readTextFile(String filePath, String charsetName) are implemented based on env.readFile but with FileProcessingMode.PROCESS_ONCE which processes only the files which are currently in the path. Best, Andrey > On 13 Dec 2018, at 07:22, Rakesh Kumar <rakkukumar2...@gmail.com> wrote: > > problem i am facing is that i wanted to read json files present in hdfs > partition folder. > > suppose hdfs partition folder is > /data/ingestion/ingestion-raw-product/2018/12/05/23. so if wanted to read > json files from hdfs i have read it from this path > /data/ingestion/ingestion-raw-product because folder present inside > /data/ingestion/ingestion-raw-product/ may changes if some new data came. > so, for this i have written flink program to read all json files from hdfs. > > Example: path is /data/ingestion/ingestion-raw-product/2018/12/05/23 and > /data/ingestion/ingestion-raw-product/2018/12/12/23 > so for reading all the json files from this path present in hdfs i have used > /data/ingestion/ingestion-raw-product/*/*/*/, but i am not able to read it. > > So, can you suggest me solution for this. > > Thanks, > Rakesh > > On Wed, Dec 12, 2018 at 10:21 PM Andrey Zagrebin <and...@data-artisans.com > <mailto:and...@data-artisans.com>> wrote: > Actually, does it not work if you just provide directory in env.readTextFile > as in your code example or what is the problem? > >> On 12 Dec 2018, at 17:24, Andrey Zagrebin <and...@data-artisans.com >> <mailto:and...@data-artisans.com>> wrote: >> >> Hi, >> >> If the question is how to read all files from hdfs directory, >> in general, each file is potentially a different DataSet (not DataStream). >> It needs to be decided how to combine/join them in Flink pipeline. >> >> If the files are small enough, you could list them as string paths and use >> env.fromCollection to start the pipeline. >> Next just manually load file into memory for each path in map operation and >> transform file contents into records for the next stage. >> >> Best, >> Andrey >> >>> On 12 Dec 2018, at 15:02, Rakesh Kumar <rakkukumar2...@gmail.com >>> <mailto:rakkukumar2...@gmail.com>> wrote: >>> >>> Hi, >>> >>> I wanted to read all json files from hdfs with partition folder. >>> >>> public static void main(String[] args) { >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> //path >>> >>> //hdfs://localhost:8020/data/ingestion/ingestion.raw.product/2018/12/05/23 >>> >>> DataStream<String> df = >>> env.readTextFile("hdfs://localhost:8020/data/ingestion/ingestion.raw.product >>> <>"); >>> try { >>> df.print(); >>> env.execute("dfg"); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> >>> } >> >