Hi Till,

Thanks for the great suggestion!
Seems like it does the job. Here is a sample of the code:

public class FlinkMain {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

        EsInputFormat<Text, LinkedMapWritable> kvEsInputFormat = new
EsInputFormat<> ();
        HadoopInputFormat<Text, LinkedMapWritable> hadoopInputFormat =
                new HadoopInputFormat<>(kvEsInputFormat, Text.class,
LinkedMapWritable.class);

        Configuration configuration = hadoopInputFormat.getConfiguration();
        configuration.set("es.resource", "flink-1/flink_t");
        configuration.set("es.query", "?q=*");

        DataSet<Tuple2<Text, LinkedMapWritable>> input =
env.createInput(hadoopInputFormat);

        List<Tuple2<Text, LinkedMapWritable>> collect = input.collect();
        collect.forEach(e -> System.out.println(e));
    }
}


On Mon, Sep 10, 2018 at 9:47 AM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Michael,
>
> have you considered trying out the EsInputFormat [1] with
> Flink's HadoopInputFormatBase? That way reading from ElasticSearch might
> already work out of the box. If not, then adding a dedicated ElasticSearch
> input format would definitely be helpful.
>
> [1] https://github.com/elastic/elasticsearch-hadoop
>
> On Sat, Sep 8, 2018 at 11:48 PM Michael Gendelman <gen...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I have a workload where I need to read and transform large amounts of
> data
> > from Elasticsearch. I'm currently using Flink only for streaming but I
> > though that it can also be a good fit for this kind of batch job.
> > However, I did not find a way to load data from Elasticsearch in parallel
> > to Flink.
> >
> > I'd like to propose *ElasticsearchInputFormat* which will be able to load
> > data from Elasticsearch in parallel by leveraging the InputSplit
> mechanism
> > in Flink and the Elasticsearch scroll API.
> >
> > The API should look something like this:
> > ElasticsearchInputFormat<MessageObj> elasticsearchInputFormat =
> > ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo)
> > .setParametersProvider(paramsProvider)
> > .setIndex("index-name")
> > .setClusterName("type-name")
> > .build();
> > DataSet<MessageObj> input = env.createInput(elasticsearchInputFormat);
> >
> > The '*query' *is a regular ES query specifying the data to fetch.
> > The '*esMapper*' maps JSON data returned from Elasticsearch to some
> object
> > (In the example above *MessageObj*)
> > In order for it to work in parallel the InputFormat will work with an
> > InputSplit which will get parameters on how to split a certain range
> using
> > the '*paramsProvider'.*
> >
> > What do you think?
> >
> > Best,
> > Michael.
> >
>

Reply via email to