Hej,

thanks for the fast reply.

I'm currently running things from inside my IDE so it should not be a
packaging problem. That said I added the plugin from the link provided but
I'm not sure what elastic search library is needed.

Where do I override the elastic search version? The only thing I'm
currently using is the flink-connector do I have to modify its code?

<dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
> <version>1.1.3</version>
> </dependency>


One thing I forgot to mention, I can only modify things locally packing it
into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm
running things on top of Hopsworks.

cheers Martin

On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi!
>
> This could be a Elasticsearch server / client version conflict, or that
> the uber jar of your code wasn’t built properly.
>
> For the first possible issue, we’re currently using Elasticsearch 2.3.5 to
> build the Flink Elasticsearch Connector. Could you try overriding this
> version to 2.4.1 when building your code and see if the problem remains?
>
> For the second issue, please check out https://ci.apache.org/
> projects/flink/flink-docs-release-1.3/dev/linking.html#
> packaging-dependencies-with-your-usercode-with-maven.
>
> Let me know if the problem remains after trying out the above :-)
>
> Cheers,
> Gordon
>
> On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneum...@sics.se) wrote:
>
> Hej,
>
> I'm trying to write to elastic search from a streaming application and I
> get a weird error message I that I can't decipher. Hopefully, someone here
> can help me. I'm trying to run the java example
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/elasticsearch2.html>
>  from
> the website.I doublechecked that I can reach the elastic search from the
> development machine by putting some data in with curl. Has anyone an idea
> what the problem is?
>
> *Technical info:*
> Flink 1.1.3
>
> Elasticsearch 2.4.1
>
> http://bbc2.sics.se:19208/
>> {
>> "name" : "hopsworks",
>> "cluster_name" : "hops",
>> "cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
>> "version" : {
>> "number" : "2.4.1",
>> "build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
>> "build_timestamp" : "2016-09-27T18:57:55Z",
>> "build_snapshot" : false,
>> "lucene_version" : "5.5.2"
>> },
>> "tagline" : "You Know, for Search"
>> }
>
>
> Changes in the code:
>
> Map<String, String> config = new HashMap<>();
>>
> // This instructs the sink to emit after every element, otherwise they
>> would be buffered
>
> config.put("bulk.flush.max.actions", "1");
>>
> config.put("cluster.name", "hops");
>>
>
>> ArrayList<InetSocketAddress> transports = new ArrayList<>();
>>
> transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se")
>> , 19208));
>
>
>
> Exception:
>
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
>> org.elasticsearch.threadpool.ThreadPool
>> at org.elasticsearch.client.transport.TransportClient$
>> Builder.build(TransportClient.java:133)
>> at org.apache.flink.streaming.connectors.elasticsearch2.
>> ElasticsearchSink.open(ElasticsearchSink.java:164)
>> at org.apache.flink.api.common.functions.util.FunctionUtils.
>> openFunction(FunctionUtils.java:38)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
>> open(AbstractUdfStreamOperator.java:91)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.
>> openAllOperators(StreamTask.java:376)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.
>> invoke(StreamTask.java:256)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>> at java.lang.Thread.run(Thread.java:745)
>
>

Reply via email to