Hi all,
I have a workload where I need to read and transform large amounts of data
from Elasticsearch. I'm currently using Flink only for streaming but I
though that it can also be a good fit for this kind of batch job.
However, I did not find a way to load data from Elasticsearch in parallel
to Flink.
I'd like to propose *ElasticsearchInputFormat* which will be able to load
data from Elasticsearch in parallel by leveraging the InputSplit mechanism
in Flink and the Elasticsearch scroll API.
The API should look something like this:
ElasticsearchInputFormat<MessageObj> elasticsearchInputFormat =
ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo)
.setParametersProvider(paramsProvider)
.setIndex("index-name")
.setClusterName("type-name")
.build();
DataSet<MessageObj> input = env.createInput(elasticsearchInputFormat);
The '*query' *is a regular ES query specifying the data to fetch.
The '*esMapper*' maps JSON data returned from Elasticsearch to some object
(In the example above *MessageObj*)
In order for it to work in parallel the InputFormat will work with an
InputSplit which will get parameters on how to split a certain range using
the '*paramsProvider'.*
What do you think?
Best,
Michael.