Hi Palle, you can recursively read all files in a folder as explained in the "Recursive Traversal of the Input Path Directory" section of the Data Source documentation [1].
The easiest way to read line-wise JSON objects is to use ExecutionEnvironment.readTextFile() which reads text files linewise as strings and a subsequent mapper that uses a JSON parser (e.g., Jackson) to parse the JSON strings. You should use a RichMapFunction and create the parser in the open() method to avoid instantiating a new parser for each incoming line. After parsing, the RichMapFunction can emit POJOs. Cheers, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#data-sources 2016-05-07 12:25 GMT+02:00 Flavio Pompermaier <[email protected]>: > I had the same issue :) > I resolved it reading all file paths in a collection, then using this code: > > env.fromCollection(filePaths).rebalance().map(file2pojo) > > You can have your dataset of Pojos! > > The rebalance() is necessary to exploit parallelism,otherwise the pipeline > will be executed with parallelism 1. > > Best, > Flavio > On 7 May 2016 12:13, "Palle" <[email protected]> wrote: > > Hi there. > > I've got a HDFS folder containing a lot of files. All files contains a lot > of JSON objects, one for each line. I will have several TB in the HDFS > folder. > > My plan is to make Flink read all files and all JSON objects and then do > some analysis on the data, actually very similar to the > flatMap/groupBy/reduceGroup transformations that is done in the WordCount > example. > > But I am a bit stuck, because I cannot seem to find out how to make Flink > read all files in a HDFS dir and then perform the transformations on the > data. I have googled quite a bit and also looked in the Flink API and mail > history. > > Can anyone point me to an example where Flink is used to read all files in > a HDFS folder and then do transformations on the data)? > > - and a second question: Is there an elegant way to make Flink handle the > JSON objects? - can they be converted to POJOs by something similar to the > pojoType() method? > > /Palle > >
