That's correct Flavio. The issue has been reported as https://issues.apache.org/jira/browse/FLINK-7386
Best, Fabian 2017-08-30 9:21 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > I also had problems with ES 5.4.3 and I had to modify the connector > code...I fear that the code is compatible only up to ES 5.2 or similar.. > > On Wed, Aug 30, 2017 at 5:40 AM, Raj Kumar <smallthings1...@gmail.com> > wrote: > >> Hi, >> I am using elasticsearch 5.4.3 version in my flink project(flink version >> 1.3.1) >> Details >> 1. Using Maven build tool. >> 2. Running from intellij IDE. >> 3. Elasticsearch is running on the local machine. >> >> Have added the following maven dependency >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-elasticsearch5_2.10</artifactId> >> <version>1.3.1</version> >> </dependency> >> >> >> *code added* >> >> Map<String, String> config = new HashMap<>(); >> config.put("cluster.name", "elasticsearch"); >> config.put("bulk.flush.max.actions", "1"); >> >> List<InetSocketAddress> transportAddresses = new >> ArrayList<>(); >> transportAddresses.add(new >> InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300)); >> >> alerts.addSink(new ElasticsearchSink<AggResult>(config, >> transportAddresses, new ElasticsearchSinkFunction<AggResult>() { >> public IndexRequest createIndexRequest(AggResult >> aggResult){ >> Map<String, Long> json = new HashMap<>(); >> json.put("totalCount", aggResult.getTotalCount()); >> >> return Requests >> .indexRequest() >> .index("logdata") >> .type("consolidatedStreamData") >> .source(json); >> >> } >> @Override >> public void process(AggResult aggResult, RuntimeContext >> runtimeContext, RequestIndexer requestIndexer) { >> requestIndexer.add(createIndexRequest(aggResult)); >> } >> })); >> >> >> >> *This results in the following error.* >> >> Caused by: java.lang.NoSuchMethodError: >> org.elasticsearch.action.bulk.BulkProcessor.add(Lorg/elastic >> search/action/ActionRequest;)Lorg/elasticsearch/action/ >> bulk/BulkProcessor; >> at >> org.apache.flink.streaming.connectors.elasticsearch.BulkProc >> essorIndexer.add(BulkProcessorIndexer.java:52) >> at ECSPrototype$2.process(ECSPrototype.java:148) >> at ECSPrototype$2.process(ECSPrototype.java:134) >> at >> org.apache.flink.streaming.connectors.elasticsearch.Elastics >> earchSinkBase.invoke(ElasticsearchSinkBase.java:282) >> at >> org.apache.flink.streaming.api.operators.StreamSink.processE >> lement(StreamSink.java:41) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.pushToOperator(OperatorChain.java:528) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.collect(OperatorChain.java:503) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.collect(OperatorChain.java:483) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$Broad >> castingOutputCollector.collect(OperatorChain.java:575) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$Broad >> castingOutputCollector.collect(OperatorChain.java:536) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor$CountingOutput.collect(AbstractStreamOperator.java:891) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor$CountingOutput.collect(AbstractStreamOperator.java:869) >> at >> org.apache.flink.streaming.api.operators.TimestampedCollecto >> r.collect(TimestampedCollector.java:51) >> at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.ja >> va:327) >> at ECSPrototype$FlinkFinalProcess.processElement(MyPrototype.ja >> va:303) >> at >> org.apache.flink.streaming.api.operators.KeyedProcessOperato >> r.processElement(KeyedProcessOperator.java:94) >> at >> org.apache.flink.streaming.runtime.io.StreamInputProcessor.p >> rocessInput(StreamInputProcessor.java:206) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask. >> run(OneInputStreamTask.java:69) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:263) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:748) >> >> >> Anyidea what is wrong here ? >> >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/ >> > > > > -- > Flavio Pompermaier > Development Department > > OKKAM S.r.l. > Tel. +(39) 0461 1823908 <+39%200461%20182%203908> >