I tried to change the elastic search version to 2.4.1 which results in a new exception:
Caused by: java.lang.NoSuchMethodError: > com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor; > at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:192) > at > org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131) > at > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) > at java.lang.Thread.run(Thread.java:745) On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Martin, > > You can do that by adding a dependency to the Elasticsearch client of your > desired version in your project. > > You can also check what Elasticsearch client version the project is using > by checking `mvn dependency:tree` from the base directory of your project. > > Cheers, > Gordon > > > On March 1, 2017 at 1:21:56 AM, Martin Neumann (mneum...@sics.se) wrote: > > Hej, > > thanks for the fast reply. > > I'm currently running things from inside my IDE so it should not be a > packaging problem. That said I added the plugin from the link provided but > I'm not sure what elastic search library is needed. > > Where do I override the elastic search version? The only thing I'm > currently using is the flink-connector do I have to modify its code? > > <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-elasticsearch2_2.10</artifactId> >> <version>1.1.3</version> >> </dependency> > > > One thing I forgot to mention, I can only modify things locally packing it > into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm > running things on top of Hopsworks. > > cheers Martin > > On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi! >> >> This could be a Elasticsearch server / client version conflict, or that >> the uber jar of your code wasn’t built properly. >> >> For the first possible issue, we’re currently using Elasticsearch 2.3.5 >> to build the Flink Elasticsearch Connector. Could you try overriding this >> version to 2.4.1 when building your code and see if the problem remains? >> >> For the second issue, please check out https://ci.apache.org/proj >> ects/flink/flink-docs-release-1.3/dev/linking.html#packaging >> -dependencies-with-your-usercode-with-maven. >> >> Let me know if the problem remains after trying out the above :-) >> >> Cheers, >> Gordon >> >> On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneum...@sics.se) wrote: >> >> Hej, >> >> I'm trying to write to elastic search from a streaming application and I >> get a weird error message I that I can't decipher. Hopefully, someone here >> can help me. I'm trying to run the java example >> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/elasticsearch2.html> >> from >> the website.I doublechecked that I can reach the elastic search from the >> development machine by putting some data in with curl. Has anyone an idea >> what the problem is? >> >> *Technical info:* >> Flink 1.1.3 >> >> Elasticsearch 2.4.1 >> >> http://bbc2.sics.se:19208/ >>> { >>> "name" : "hopsworks", >>> "cluster_name" : "hops", >>> "cluster_uuid" : "XIVrGHeaTc2nICQC85chpw", >>> "version" : { >>> "number" : "2.4.1", >>> "build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16", >>> "build_timestamp" : "2016-09-27T18:57:55Z", >>> "build_snapshot" : false, >>> "lucene_version" : "5.5.2" >>> }, >>> "tagline" : "You Know, for Search" >>> } >> >> >> Changes in the code: >> >> Map<String, String> config = new HashMap<>(); >>> >> // This instructs the sink to emit after every element, otherwise they >>> would be buffered >> >> config.put("bulk.flush.max.actions", "1"); >>> >> config.put("cluster.name", "hops"); >>> >> >>> ArrayList<InetSocketAddress> transports = new ArrayList<>(); >>> >> transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se" >>> ), 19208)); >> >> >> >> Exception: >> >> Caused by: java.lang.NoClassDefFoundError: Could not initialize class >>> org.elasticsearch.threadpool.ThreadPool >>> at org.elasticsearch.client.transport.TransportClient$Builder. >>> build(TransportClient.java:133) >>> at org.apache.flink.streaming.connectors.elasticsearch2.Elastic >>> searchSink.open(ElasticsearchSink.java:164) >>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope >>> nFunction(FunctionUtils.java:38) >>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp >>> erator.open(AbstractUdfStreamOperator.java:91) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO >>> perators(StreamTask.java:376) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>> StreamTask.java:256) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) >>> at java.lang.Thread.run(Thread.java:745) >> >> >