Hi Michael, have you considered trying out the EsInputFormat [1] with Flink's HadoopInputFormatBase? That way reading from ElasticSearch might already work out of the box. If not, then adding a dedicated ElasticSearch input format would definitely be helpful.
[1] https://github.com/elastic/elasticsearch-hadoop On Sat, Sep 8, 2018 at 11:48 PM Michael Gendelman <gen...@gmail.com> wrote: > Hi all, > > I have a workload where I need to read and transform large amounts of data > from Elasticsearch. I'm currently using Flink only for streaming but I > though that it can also be a good fit for this kind of batch job. > However, I did not find a way to load data from Elasticsearch in parallel > to Flink. > > I'd like to propose *ElasticsearchInputFormat* which will be able to load > data from Elasticsearch in parallel by leveraging the InputSplit mechanism > in Flink and the Elasticsearch scroll API. > > The API should look something like this: > ElasticsearchInputFormat<MessageObj> elasticsearchInputFormat = > ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo) > .setParametersProvider(paramsProvider) > .setIndex("index-name") > .setClusterName("type-name") > .build(); > DataSet<MessageObj> input = env.createInput(elasticsearchInputFormat); > > The '*query' *is a regular ES query specifying the data to fetch. > The '*esMapper*' maps JSON data returned from Elasticsearch to some object > (In the example above *MessageObj*) > In order for it to work in parallel the InputFormat will work with an > InputSplit which will get parameters on how to split a certain range using > the '*paramsProvider'.* > > What do you think? > > Best, > Michael. >