Hi Michael,

have you considered trying out the EsInputFormat [1] with
Flink's HadoopInputFormatBase? That way reading from ElasticSearch might
already work out of the box. If not, then adding a dedicated ElasticSearch
input format would definitely be helpful.

[1] https://github.com/elastic/elasticsearch-hadoop

On Sat, Sep 8, 2018 at 11:48 PM Michael Gendelman <gen...@gmail.com> wrote:

> Hi all,
>
> I have a workload where I need to read and transform large amounts of data
> from Elasticsearch. I'm currently using Flink only for streaming but I
> though that it can also be a good fit for this kind of batch job.
> However, I did not find a way to load data from Elasticsearch in parallel
> to Flink.
>
> I'd like to propose *ElasticsearchInputFormat* which will be able to load
> data from Elasticsearch in parallel by leveraging the InputSplit mechanism
> in Flink and the Elasticsearch scroll API.
>
> The API should look something like this:
> ElasticsearchInputFormat<MessageObj> elasticsearchInputFormat =
> ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo)
> .setParametersProvider(paramsProvider)
> .setIndex("index-name")
> .setClusterName("type-name")
> .build();
> DataSet<MessageObj> input = env.createInput(elasticsearchInputFormat);
>
> The '*query' *is a regular ES query specifying the data to fetch.
> The '*esMapper*' maps JSON data returned from Elasticsearch to some object
> (In the example above *MessageObj*)
> In order for it to work in parallel the InputFormat will work with an
> InputSplit which will get parameters on how to split a certain range using
> the '*paramsProvider'.*
>
> What do you think?
>
> Best,
> Michael.
>

Reply via email to