I tried to change the elastic search version to 2.4.1 which results in a
new exception:

Caused by: java.lang.NoSuchMethodError:
> com.google.common.util.concurrent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
> at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.java:192)
> at
> org.elasticsearch.client.transport.TransportClient$Builder.build(TransportClient.java:131)
> at
> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:164)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
> at java.lang.Thread.run(Thread.java:745)



On Wed, Mar 1, 2017 at 7:58 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Martin,
>
> You can do that by adding a dependency to the Elasticsearch client of your
> desired version in your project.
>
> You can also check what Elasticsearch client version the project is using
> by checking `mvn dependency:tree` from the base directory of your project.
>
> Cheers,
> Gordon
>
>
> On March 1, 2017 at 1:21:56 AM, Martin Neumann (mneum...@sics.se) wrote:
>
> Hej,
>
> thanks for the fast reply.
>
> I'm currently running things from inside my IDE so it should not be a
> packaging problem. That said I added the plugin from the link provided but
> I'm not sure what elastic search library is needed.
>
> Where do I override the elastic search version? The only thing I'm
> currently using is the flink-connector do I have to modify its code?
>
> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
>> <version>1.1.3</version>
>> </dependency>
>
>
> One thing I forgot to mention, I can only modify things locally packing it
> into a jar. I'm stuck with stock Flink 1.1.3 for the execution since I'm
> running things on top of Hopsworks.
>
> cheers Martin
>
> On Tue, Feb 28, 2017 at 5:42 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi!
>>
>> This could be a Elasticsearch server / client version conflict, or that
>> the uber jar of your code wasn’t built properly.
>>
>> For the first possible issue, we’re currently using Elasticsearch 2.3.5
>> to build the Flink Elasticsearch Connector. Could you try overriding this
>> version to 2.4.1 when building your code and see if the problem remains?
>>
>> For the second issue, please check out https://ci.apache.org/proj
>> ects/flink/flink-docs-release-1.3/dev/linking.html#packaging
>> -dependencies-with-your-usercode-with-maven.
>>
>> Let me know if the problem remains after trying out the above :-)
>>
>> Cheers,
>> Gordon
>>
>> On March 1, 2017 at 12:24:08 AM, Martin Neumann (mneum...@sics.se) wrote:
>>
>> Hej,
>>
>> I'm trying to write to elastic search from a streaming application and I
>> get a weird error message I that I can't decipher. Hopefully, someone here
>> can help me. I'm trying to run the java example
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/elasticsearch2.html>
>>  from
>> the website.I doublechecked that I can reach the elastic search from the
>> development machine by putting some data in with curl. Has anyone an idea
>> what the problem is?
>>
>> *Technical info:*
>> Flink 1.1.3
>>
>> Elasticsearch 2.4.1
>>
>> http://bbc2.sics.se:19208/
>>> {
>>> "name" : "hopsworks",
>>> "cluster_name" : "hops",
>>> "cluster_uuid" : "XIVrGHeaTc2nICQC85chpw",
>>> "version" : {
>>> "number" : "2.4.1",
>>> "build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
>>> "build_timestamp" : "2016-09-27T18:57:55Z",
>>> "build_snapshot" : false,
>>> "lucene_version" : "5.5.2"
>>> },
>>> "tagline" : "You Know, for Search"
>>> }
>>
>>
>> Changes in the code:
>>
>> Map<String, String> config = new HashMap<>();
>>>
>> // This instructs the sink to emit after every element, otherwise they
>>> would be buffered
>>
>> config.put("bulk.flush.max.actions", "1");
>>>
>> config.put("cluster.name", "hops");
>>>
>>
>>> ArrayList<InetSocketAddress> transports = new ArrayList<>();
>>>
>> transports.add(new InetSocketAddress(InetAddress.getByName("bbc2.sics.se"
>>> ), 19208));
>>
>>
>>
>> Exception:
>>
>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
>>> org.elasticsearch.threadpool.ThreadPool
>>> at org.elasticsearch.client.transport.TransportClient$Builder.
>>> build(TransportClient.java:133)
>>> at org.apache.flink.streaming.connectors.elasticsearch2.Elastic
>>> searchSink.open(ElasticsearchSink.java:164)
>>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>>> nFunction(FunctionUtils.java:38)
>>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>> erator.open(AbstractUdfStreamOperator.java:91)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>>> perators(StreamTask.java:376)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:256)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>

Reply via email to