Actually, does it not work if you just provide directory in env.readTextFile as 
in your code example or what is the problem?

> On 12 Dec 2018, at 17:24, Andrey Zagrebin <and...@data-artisans.com> wrote:
> 
> Hi,
> 
> If the question is how to read all files from hdfs directory,
> in general, each file is potentially a different DataSet (not DataStream). 
> It needs to be decided how to combine/join them in Flink pipeline.
> 
> If the files are small enough, you could list them as string paths and use 
> env.fromCollection to start the pipeline.
> Next just manually load file into memory for each path in map operation and 
> transform file contents into records for the next stage.
> 
> Best,
> Andrey
> 
>> On 12 Dec 2018, at 15:02, Rakesh Kumar <rakkukumar2...@gmail.com 
>> <mailto:rakkukumar2...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> I wanted to read all json files from hdfs with partition folder.
>> 
>>      public static void main(String[] args) {
>> 
>>              StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>              //path
>>              
>> //hdfs://localhost:8020/data/ingestion/ingestion.raw.product/2018/12/05/23
>> 
>>              DataStream<String> df = 
>> env.readTextFile("hdfs://localhost:8020/data/ingestion/ingestion.raw.product 
>> <hdfs://localhost:8020/data/ingestion/ingestion.raw.product>");
>>              try {
>>                      df.print();
>>                      env.execute("dfg");
>>              } catch (Exception e) {
>>                      e.printStackTrace();
>>              }
>> 
>>      }
> 

Reply via email to