Actually, does it not work if you just provide directory in env.readTextFile as in your code example or what is the problem?
> On 12 Dec 2018, at 17:24, Andrey Zagrebin <and...@data-artisans.com> wrote: > > Hi, > > If the question is how to read all files from hdfs directory, > in general, each file is potentially a different DataSet (not DataStream). > It needs to be decided how to combine/join them in Flink pipeline. > > If the files are small enough, you could list them as string paths and use > env.fromCollection to start the pipeline. > Next just manually load file into memory for each path in map operation and > transform file contents into records for the next stage. > > Best, > Andrey > >> On 12 Dec 2018, at 15:02, Rakesh Kumar <rakkukumar2...@gmail.com >> <mailto:rakkukumar2...@gmail.com>> wrote: >> >> Hi, >> >> I wanted to read all json files from hdfs with partition folder. >> >> public static void main(String[] args) { >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> //path >> >> //hdfs://localhost:8020/data/ingestion/ingestion.raw.product/2018/12/05/23 >> >> DataStream<String> df = >> env.readTextFile("hdfs://localhost:8020/data/ingestion/ingestion.raw.product >> <hdfs://localhost:8020/data/ingestion/ingestion.raw.product>"); >> try { >> df.print(); >> env.execute("dfg"); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> >> } >