Hi all, I have a workload where I need to read and transform large amounts of data from Elasticsearch. I'm currently using Flink only for streaming but I though that it can also be a good fit for this kind of batch job. However, I did not find a way to load data from Elasticsearch in parallel to Flink.
I'd like to propose *ElasticsearchInputFormat* which will be able to load data from Elasticsearch in parallel by leveraging the InputSplit mechanism in Flink and the Elasticsearch scroll API. The API should look something like this: ElasticsearchInputFormat<MessageObj> elasticsearchInputFormat = ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo) .setParametersProvider(paramsProvider) .setIndex("index-name") .setClusterName("type-name") .build(); DataSet<MessageObj> input = env.createInput(elasticsearchInputFormat); The '*query' *is a regular ES query specifying the data to fetch. The '*esMapper*' maps JSON data returned from Elasticsearch to some object (In the example above *MessageObj*) In order for it to work in parallel the InputFormat will work with an InputSplit which will get parameters on how to split a certain range using the '*paramsProvider'.* What do you think? Best, Michael.