Hej, thanks for the fast reply.
I'm currently running things from inside my IDE so it should not be a packaging problem. That said I added the plugin from the link provided but I'm not sure what elastic search library is needed. Where do I override the elastic search version? The only thing I'm currently using is the flink-connector do I have to modify its code? <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-elasticsearch2_2.10</artifactId> > <version>1.1.3</version> > </dependency> One thing I forgot to mention, I can only modify things locally packing it into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm running things on top of Hopsworks. cheers Martin On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi! > > This could be a Elasticsearch server / client version conflict, or that > the uber jar of your code wasn’t built properly. > > For the first possible issue, we’re currently using Elasticsearch 2.3.5 to > build the Flink Elasticsearch Connector. Could you try overriding this > version to 2.4.1 when building your code and see if the problem remains? > > For the second issue, please check out https://ci.apache.org/ > projects/flink/flink-docs-release-1.3/dev/linking.html# > packaging-dependencies-with-your-usercode-with-maven. > > Let me know if the problem remains after trying out the above :-) > > Cheers, > Gordon > > On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneum...@sics.se) wrote: > > Hej, > > I'm trying to write to elastic search from a streaming application and I > get a weird error message I that I can't decipher. Hopefully, someone here > can help me. I'm trying to run the java example > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/elasticsearch2.html> > from > the website.I doublechecked that I can reach the elastic search from the > development machine by putting some data in with curl. Has anyone an idea > what the problem is? > > *Technical info:* > Flink 1.1.3 > > Elasticsearch 2.4.1 > > http://bbc2.sics.se:19208/ >> { >> "name" : "hopsworks", >> "cluster_name" : "hops", >> "cluster_uuid" : "XIVrGHeaTc2nICQC85chpw", >> "version" : { >> "number" : "2.4.1", >> "build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16", >> "build_timestamp" : "2016-09-27T18:57:55Z", >> "build_snapshot" : false, >> "lucene_version" : "5.5.2" >> }, >> "tagline" : "You Know, for Search" >> } > > > Changes in the code: > > Map<String, String> config = new HashMap<>(); >> > // This instructs the sink to emit after every element, otherwise they >> would be buffered > > config.put("bulk.flush.max.actions", "1"); >> > config.put("cluster.name", "hops"); >> > >> ArrayList<InetSocketAddress> transports = new ArrayList<>(); >> > transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se") >> , 19208)); > > > > Exception: > > Caused by: java.lang.NoClassDefFoundError: Could not initialize class >> org.elasticsearch.threadpool.ThreadPool >> at org.elasticsearch.client.transport.TransportClient$ >> Builder.build(TransportClient.java:133) >> at org.apache.flink.streaming.connectors.elasticsearch2. >> ElasticsearchSink.open(ElasticsearchSink.java:164) >> at org.apache.flink.api.common.functions.util.FunctionUtils. >> openFunction(FunctionUtils.java:38) >> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator. >> open(AbstractUdfStreamOperator.java:91) >> at org.apache.flink.streaming.runtime.tasks.StreamTask. >> openAllOperators(StreamTask.java:376) >> at org.apache.flink.streaming.runtime.tasks.StreamTask. >> invoke(StreamTask.java:256) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) >> at java.lang.Thread.run(Thread.java:745) > >