Thanks a lot for many answer :)

Last time I write this email cause I don't understand what is the
difference between LOCAL cluster (one node) and IntelliJ IDEA. Now I know :P

https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html


If you read this *"Linking with modules not contained in the binary
distribution" *and check pom in flink directory - that doesn't have  this:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch2_2.10</artifactId>
  <version>1.1-SNAPSHOT</version></dependency>

So flink doesn't contain CONNECTORS (Twitter, Elasticsearch, etc) in the
binary distribution. Now I understand and I add this to *my pom*


<plugin>
   <groupId>org.apache.maven.plugins</groupId>
   <artifactId>maven-dependency-plugin</artifactId>
   <version>2.9</version>
   <executions>
      <execution>
         <id>unpack</id>
         <!-- executed just before the package phase -->
         <phase>prepare-package</phase>
         <goals>
            <goal>unpack</goal>
         </goals>
         <configuration>
            <artifactItems>
               <!-- For Flink connector classes -->
               <artifactItem>
                  <groupId>org.apache.flink</groupId>

<artifactId>flink-connector-twitter_${scala.version}</artifactId>
                  <version>1.1-SNAPSHOT</version>
                  <type>jar</type>
                  <overWrite>false</overWrite>

<outputDirectory>${project.build.directory}/classes</outputDirectory>
                  <includes>org/apache/flink/**</includes>
               </artifactItem>

            </artifactItems>
         </configuration>
      </execution>
   </executions>
</plugin>

Of course I copy created jar to "*flink/build-target/lib"*

And when I run job in local cluster I saw tweets without any erros. Half of
my problem was resolved.

2) Connect to Elasticsearch - I did this same like in twitter connector but
I still have problem.

My intelliJ and local cluster is in the same machine so:


"Seeing how you put a loopback address into the transport addresses, are
you sure that an ElasticSearch node runs on every machine?" - *YES CAUSE
IT'S ON MY NOTEBOOK*

*"*Are you sure the elastic cluster is running correctly?" - *YES CAUSE I
SEE INDEXES, NODE and CLUSTER NAME when i run http://127.0.0.1:9200/_cat/
<http://127.0.0.1:9200/_cat/>   or **http://127.0.0.1:9200/
<http://127.0.0.1:9200/>   etc*

"Is it possible that the configuration of elastic may be incorrect, or some
of the ports may be blocked?" - *NO CAUSE I RUN THIS IN MY INTELLIJ IDEA
AND I DONT HAVE PROBLEM TO CONNECT TO ELASTICSEACH 2.3.2 ;)*


*My poms with elasticseach connector:*

<plugin>
   <groupId>org.apache.maven.plugins</groupId>
   <artifactId>maven-dependency-plugin</artifactId>
   <version>2.9</version>
   <executions>
      <execution>
         <id>unpack</id>
         <!-- executed just before the package phase -->
         <phase>prepare-package</phase>
         <goals>
            <goal>unpack</goal>
         </goals>
         <configuration>
            <artifactItems>
               <!-- For Flink connector classes -->
               <artifactItem>
                  <groupId>org.apache.flink</groupId>

<artifactId>flink-connector-twitter_${scala.version}</artifactId>
                  <version>1.1-SNAPSHOT</version>
                  <type>jar</type>
                  <overWrite>false</overWrite>

<outputDirectory>${project.build.directory}/classes</outputDirectory>
                  <includes>org/apache/flink/**</includes>
               </artifactItem>

               <artifactItem>
                  <groupId>org.apache.flink</groupId>

<artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId>
                  <version>1.1-SNAPSHOT</version>
                  <type>jar</type>
                  <overWrite>false</overWrite>

<outputDirectory>${project.build.directory}/classes</outputDirectory>
                  <includes>org/apache/flink/**</includes>
               </artifactItem>

            </artifactItems>
         </configuration>
      </execution>
   </executions>
</plugin>



And when I copy jar to "*flink/build-target/lib" *I don't have problem with
"not connected to any elasticsearch node" but I have problem with
Noclassfound org.elasticsearch.Client
ok so I copy next jar from mvn repository and I have next problem with joda
Time :D

and the next and the next :D

And it's not work :(



*So how it's correct way to connect to elasticsearch 2.3.2 in local
cluster?*



2016-05-11 20:35 GMT+02:00 Martin Neumann <[email protected]>:

> Hi,
>
> Are you sure the elastic cluster is running correctly?
>
> Open a browser and try 127.0.0.1:9200 that should give you the overview
> of the cluster. If you don't get it there is something wrong with the
> setup. Its also a good way to double check the cluster.name (I got that
> wrong more than once)
>
> I used to have some connection problems with older elastic versions (don't
> remember which one). I was able to get around it by retrying multiple times.
>
> cheers Martin
>
> On Wed, May 11, 2016 at 7:43 PM, Stephan Ewen <[email protected]> wrote:
>
>> Seeing how you put a loopback address into the transport addresses, are
>> you sure that an ElasticSearch node runs on every machine?
>>
>> On Wed, May 11, 2016 at 7:41 PM, Stephan Ewen <[email protected]> wrote:
>>
>>> ElasticSearch is basically saying that it cannot connect.
>>>
>>> Is it possible that the configuration of elastic may be incorrect, or
>>> some of the ports may be blocked?
>>>
>>>
>>> On Mon, May 9, 2016 at 7:05 PM, rafal green <[email protected]>
>>> wrote:
>>>
>>>> Dear Sir or Madam,
>>>>
>>>> Can you tell me why I have a problem with elasticsearch in local
>>>> cluster?
>>>>
>>>> I analysed this example:
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html
>>>>
>>>> My flink and elasticsearch config are default (only I change node.name to
>>>> "node-1")
>>>>
>>>> This example run on my IntelliJIdea 15 but on local cluster I have a
>>>> problem. Of course WordCount and SocketTextStreamWordCount works fine.
>>>>
>>>>
>>>> I spend 2 days to try find solution (With uncle google ;) ) but It's
>>>> not easy
>>>>
>>>> val config = new java.util.HashMap[String, String]
>>>> config.put("bulk.flush.max.actions", "1")
>>>> config.put("cluster.name", "*elasticsearch*")
>>>> config.put("path.home", "
>>>> */media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2*
>>>> ")
>>>>
>>>> val transports = new util.ArrayList[InetSocketAddress]
>>>> transports.add(new InetSocketAddress(InetAddress.getByName("*127.0.0.1*
>>>> "),9300))
>>>>
>>>>
>>>>
>>>> Error output:
>>>>
>>>> java.lang.RuntimeException: *Client is not connected to any
>>>> Elasticsearch nodes!*
>>>> at
>>>> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
>>>> at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> 05/08/2016 22:57:02 Job execution switched to status FAILING.
>>>> java.lang.RuntimeException: Client is not connected to any
>>>> Elasticsearch nodes!
>>>> at
>>>> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
>>>> at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> 05/08/2016 22:57:02 Job execution switched to status FAILED.
>>>>
>>>> ------------------------------------------------------------
>>>>  The program finished with the following exception:
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>> execution failed: Job execution failed.
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>>>> at
>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541)
>>>> at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69)
>>>> at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>> at
>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860)
>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327)
>>>> at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187)
>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238)
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>> execution failed.
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807)
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
>>>> at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753)
>>>> at
>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>> at
>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>> at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: java.lang.RuntimeException: Client is not connected to any
>>>> Elasticsearch nodes!
>>>> at
>>>> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
>>>> at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>> Best regards,
>>>> Rafal Greeny
>>>>
>>>
>>>
>>
>

Reply via email to