Hi Palle,

you can recursively read all files in a folder as explained in the
"Recursive Traversal of the Input Path Directory" section of the Data
Source documentation [1].

The easiest way to read line-wise JSON objects is to use
ExecutionEnvironment.readTextFile() which reads text files linewise as
strings and a subsequent mapper that uses a JSON parser (e.g., Jackson) to
parse the JSON strings. You should use a RichMapFunction and create the
parser in the open() method to avoid instantiating a new parser for each
incoming line. After parsing, the RichMapFunction can emit POJOs.

Cheers, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#data-sources

2016-05-07 12:25 GMT+02:00 Flavio Pompermaier <[email protected]>:

> I had the same issue :)
> I resolved it reading all file paths in a collection, then using this code:
>
> env.fromCollection(filePaths).rebalance().map(file2pojo)
>
> You can have your dataset of Pojos!
>
> The rebalance() is necessary to exploit parallelism,otherwise the pipeline
> will be executed with parallelism 1.
>
> Best,
> Flavio
> On 7 May 2016 12:13, "Palle" <[email protected]> wrote:
>
> Hi there.
>
> I've got a HDFS folder containing a lot of files. All files contains a lot
> of JSON objects, one for each line. I will have several TB in the HDFS
> folder.
>
> My plan is to make Flink read all files and all JSON objects and then do
> some analysis on the data, actually very similar to the
> flatMap/groupBy/reduceGroup transformations that is done in the WordCount
> example.
>
> But I am a bit stuck, because I cannot seem to find out how to make Flink
> read all files in a HDFS dir and then perform the transformations on the
> data. I have googled quite a bit and also looked in the Flink API and mail
> history.
>
> Can anyone point me to an example where Flink is used to read all files in
> a HDFS folder and then do transformations on the data)?
>
> - and a second question: Is there an elegant way to make Flink handle the
> JSON objects? - can they be converted to POJOs by something similar to the
> pojoType() method?
>
> /Palle
>
>

Reply via email to