Hi Rakesh,

So the problem is that you want your Flink job to monitor ' 
/data/ingestion/ingestion-raw-product’ path for new files inside and process 
them when they appear, right?

Can you try env.readFile but with watchType = 
FileProcessingMode.PROCESS_CONTINUOUSLY?
You can see an example in how env.readTextFile(String filePath) and 
env.readTextFile(String filePath, String charsetName) are implemented based on 
env.readFile but with FileProcessingMode.PROCESS_ONCE which processes only the 
files which are currently in the path.

Best,
Andrey

> On 13 Dec 2018, at 07:22, Rakesh Kumar <rakkukumar2...@gmail.com> wrote:
> 
> problem i am facing is that i wanted to read json files present in hdfs 
> partition folder.
> 
> suppose hdfs partition folder is 
> /data/ingestion/ingestion-raw-product/2018/12/05/23. so if wanted to read 
> json files from hdfs i have read it from this path 
> /data/ingestion/ingestion-raw-product because folder present inside  
> /data/ingestion/ingestion-raw-product/ may changes if some new  data came. 
> so, for this i have written flink program to read all json files from hdfs.
> 
> Example:  path is /data/ingestion/ingestion-raw-product/2018/12/05/23 and 
> /data/ingestion/ingestion-raw-product/2018/12/12/23 
> so for reading all the json files from this path present in hdfs i have used 
> /data/ingestion/ingestion-raw-product/*/*/*/,  but i am not able to read it.
> 
> So, can you suggest me solution for this.
> 
> Thanks,
> Rakesh
> 
> On Wed, Dec 12, 2018 at 10:21 PM Andrey Zagrebin <and...@data-artisans.com 
> <mailto:and...@data-artisans.com>> wrote:
> Actually, does it not work if you just provide directory in env.readTextFile 
> as in your code example or what is the problem?
> 
>> On 12 Dec 2018, at 17:24, Andrey Zagrebin <and...@data-artisans.com 
>> <mailto:and...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> If the question is how to read all files from hdfs directory,
>> in general, each file is potentially a different DataSet (not DataStream). 
>> It needs to be decided how to combine/join them in Flink pipeline.
>> 
>> If the files are small enough, you could list them as string paths and use 
>> env.fromCollection to start the pipeline.
>> Next just manually load file into memory for each path in map operation and 
>> transform file contents into records for the next stage.
>> 
>> Best,
>> Andrey
>> 
>>> On 12 Dec 2018, at 15:02, Rakesh Kumar <rakkukumar2...@gmail.com 
>>> <mailto:rakkukumar2...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I wanted to read all json files from hdfs with partition folder.
>>> 
>>>     public static void main(String[] args) {
>>> 
>>>             StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>>             //path
>>>             
>>> //hdfs://localhost:8020/data/ingestion/ingestion.raw.product/2018/12/05/23
>>> 
>>>             DataStream<String> df = 
>>> env.readTextFile("hdfs://localhost:8020/data/ingestion/ingestion.raw.product
>>>  <>");
>>>             try {
>>>                     df.print();
>>>                     env.execute("dfg");
>>>             } catch (Exception e) {
>>>                     e.printStackTrace();
>>>             }
>>> 
>>>     }
>> 
> 

Reply via email to