Hi Gordon,

Thanks for advice - it's work perfect but only in elasticsearch case.

This pom version works for elasticsearch 2.2.1.

<artifactItem>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId>
   <version>1.1-SNAPSHOT</version>
   <type>jar</type>
   <overWrite>false</overWrite>
   <outputDirectory>${project.build.directory}/classes</outputDirectory>
   <includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
   <groupId>org.elasticsearch</groupId>
   <artifactId>elasticsearch</artifactId>
   <version>2.2.1</version>
   <type>jar</type>
   <overWrite>false</overWrite>
   <outputDirectory>${project.build.directory}/classes</outputDirectory>
   <includes>org/elasticsearch/**</includes>
</artifactItem>



Why 2.2.1 ? Beacuse if you check the
*"flink/flink-streaming-connectors/flink-connector-elasticsearch2/pom.xml"
* you will see this line*
"<elasticsearch.version>2.2.1</elasticsearch.version>"*


But Gordon your idea* not working with twitter-connector*. and I try add
 this: (to pom) and it's not working

<artifactItem>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-twitter_${scala.version}</artifactId>
   <version>1.1-SNAPSHOT</version>
   <type>jar</type>
   <overWrite>false</overWrite>
   <outputDirectory>${project.build.directory}/classes</outputDirectory>
   <includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
   <groupId>com.twitter</groupId>
   <artifactId>hbc-core</artifactId>
   <version>2.2.0</version>
   <type>jar</type>
   <overWrite>false</overWrite>
   <outputDirectory>${project.build.directory}/classes</outputDirectory>
   <includes>com/twitter/**</includes>
</artifactItem>



or that

<artifactItem>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-twitter_${scala.version}</artifactId>
   <version>1.1-SNAPSHOT</version>
   <type>jar</type>
   <overWrite>false</overWrite>
   <outputDirectory>${project.build.directory}/classes</outputDirectory>
   <includes>org/apache/flink/**</includes>
</artifactItem>
<artifactItem>
   <groupId>com.twitter</groupId>
   <artifactId>hbc-core</artifactId>
   <version>2.2.0</version>
   <type>jar</type>
   <overWrite>false</overWrite>
   <outputDirectory>${project.build.directory}/classes</outputDirectory>
   <includes>com/twitter/**</includes>
</artifactItem>
<artifactItem>
   <groupId>org.apache.httpcomponents</groupId>
   <artifactId>httpclient</artifactId>
   <version>4.2.5</version>
   <type>jar</type>
   <overWrite>false</overWrite>
   <outputDirectory>${project.build.directory}/classes</outputDirectory>
   <includes>org/apache/httpcomponents/**</includes>
</artifactItem>
<artifactItem>
   <groupId>com.twitter</groupId>
   <artifactId>joauth</artifactId>
   <version>6.0.2</version>
   <type>jar</type>
   <overWrite>false</overWrite>
   <outputDirectory>${project.build.directory}/classes</outputDirectory>
   <includes>com/twitter/**</includes>
</artifactItem>
<artifactItem>
   <groupId>org.apache.httpcomponents</groupId>
   <artifactId>httpcore</artifactId>
   <version>4.2.4</version>
   <type>jar</type>
   <overWrite>false</overWrite>
   <outputDirectory>${project.build.directory}/classes</outputDirectory>
   <includes>org/apache/httpcomponents/**</includes>
</artifactItem>
<artifactItem>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>14.0.1</version>
   <type>jar</type>
   <overWrite>false</overWrite>
   <outputDirectory>${project.build.directory}/classes</outputDirectory>
   <includes>com/google/guava/**</includes>
</artifactItem>



And if I run job I see this error:

2016-05-12 21:49:37,681 INFO  org.elasticsearch.plugins
                     - [node-1] modules [], plugins [], sites []
2016-05-12 21:49:37,738 INFO  org.apache.flink.runtime.blob.BlobCache
                     - Downloading
5ff307efcde8deebfb2886733e40994c01fbba7d from
localhost/127.0.0.1:47639
2016-05-12 21:49:38,109 INFO
org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink
 - Created Elasticsearch TransportClient
org.elasticsearch.client.transport.TransportClient@66cdf89
2016-05-12 21:49:38,114 INFO
org.apache.flink.streaming.connectors.twitter.TwitterSource   -
Initializing Twitter Streaming API connection
2016-05-12 21:49:38,357 INFO  com.twitter.hbc.httpclient.BasicClient
                     - New connection executed: flink-twitter-source,
endpoint: /1.1/statuses/sample.json
2016-05-12 21:49:38,357 INFO
org.apache.flink.streaming.connectors.twitter.TwitterSource   -
Twitter Streaming API connection established successfully
2016-05-12 21:49:38,376 WARN  com.twitter.hbc.httpclient.ClientBase
                     - flink-twitter-source Uncaught exception
java.lang.NoSuchMethodError:
org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
        at 
org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)
        at 
org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:114)
        at 
org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:99)
        at 
org.apache.http.impl.conn.PoolingClientConnectionManager.<init>(PoolingClientConnectionManager.java:85)
        at 
com.twitter.hbc.httpclient.RestartableHttpClient.setup(RestartableHttpClient.java:56)
        at com.twitter.hbc.httpclient.ClientBase.run(ClientBase.java:118)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
2016-05-12 21:49:38,379 INFO  com.twitter.hbc.httpclient.ClientBase
                     - flink-twitter-source exit event -
java.lang.NoSuchMethodError:
org.apache.http.impl.conn.DefaultClientConnectionOperator.<init>(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
2016-05-12 21:49:38,380 INFO  com.twitter.hbc.httpclient.ClientBase
                     - flink-twitter-source Shutting down httpclient
connection manager




 ... and finaly  "flink-connector-twitter_2.11-1.1-SNAPSHOT.jar" -  if I
add jar to this location: flink/build-target/lib/   - it's working. No idea
why :P


2016-05-12 0:32 GMT+02:00 Tzu-Li (Gordon) Tai <[email protected]>:

> Hi Rafal,
>
> From your description, it seems like Flink is complaining because it cannot
> access the Elasticsearch API related dependencies as well. You'd also have
> to include the following into your Maven build, under <artifactItems>:
>
> <artifactItem>
>     <groupId>org.elasticsearch</groupId>
>     <artifactId>elasticsearch</artifactId>
>     <version>2.3.2</version>
>     <type>jar</type>
>     <overWrite>false</overWrite>
>     <outputDirectory>${project.build.directory}/classes</outputDirectory>
>     <includes>org/elasticsearch/**</includes>
> </artifactItem>
>
> Now your built jar should correctly include all required dependencies (the
> connector & Elasticsearch API).
>
> As explained in  Linking with modules not contained in the binary
> distribution
> <
> https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
> >
> , it will be enough to package dependencies along with your code for Flink
> to access all required dependencies, and you wouldn't need to copy the jar
> to the lib folder. I would recommend to clean up the lib folder of the
> previous jars you copied, and follow this approach in the future, just in
> case they mess up the classloader.
>
> As with your first attempt that Flink cannot find any Elasticsearch nodes
> when executed in the IDE, I'm suspecting the reason is that the
> elasticsearch2 connector by default uses version 2.2.1, lower than your
> cluster version 2.3.2. I had previous experience when Elasticsearch
> strangely complains not finding any nodes when using lower client versions
> than the deployment. Can you try compiling the elasticsearch2 connector
> with
> the option -Delasticsearch.version=2.3.2, and use the newly build connector
> jar, following the same method mentioned above?
>
> Hope this helps!
>
> Cheers,
> Gordon
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Local-Cluster-have-problem-with-connect-to-elasticsearch-tp6788p6838.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to