Thanks a lot for many answer :) Last time I write this email cause I don't understand what is the difference between LOCAL cluster (one node) and IntelliJ IDEA. Now I know :P
https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html If you read this *"Linking with modules not contained in the binary distribution" *and check pom in flink directory - that doesn't have this: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch2_2.10</artifactId> <version>1.1-SNAPSHOT</version></dependency> So flink doesn't contain CONNECTORS (Twitter, Elasticsearch, etc) in the binary distribution. Now I understand and I add this to *my pom* <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.9</version> <executions> <execution> <id>unpack</id> <!-- executed just before the package phase --> <phase>prepare-package</phase> <goals> <goal>unpack</goal> </goals> <configuration> <artifactItems> <!-- For Flink connector classes --> <artifactItem> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-twitter_${scala.version}</artifactId> <version>1.1-SNAPSHOT</version> <type>jar</type> <overWrite>false</overWrite> <outputDirectory>${project.build.directory}/classes</outputDirectory> <includes>org/apache/flink/**</includes> </artifactItem> </artifactItems> </configuration> </execution> </executions> </plugin> Of course I copy created jar to "*flink/build-target/lib"* And when I run job in local cluster I saw tweets without any erros. Half of my problem was resolved. 2) Connect to Elasticsearch - I did this same like in twitter connector but I still have problem. My intelliJ and local cluster is in the same machine so: "Seeing how you put a loopback address into the transport addresses, are you sure that an ElasticSearch node runs on every machine?" - *YES CAUSE IT'S ON MY NOTEBOOK* *"*Are you sure the elastic cluster is running correctly?" - *YES CAUSE I SEE INDEXES, NODE and CLUSTER NAME when i run http://127.0.0.1:9200/_cat/ <http://127.0.0.1:9200/_cat/> or **http://127.0.0.1:9200/ <http://127.0.0.1:9200/> etc* "Is it possible that the configuration of elastic may be incorrect, or some of the ports may be blocked?" - *NO CAUSE I RUN THIS IN MY INTELLIJ IDEA AND I DONT HAVE PROBLEM TO CONNECT TO ELASTICSEACH 2.3.2 ;)* *My poms with elasticseach connector:* <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.9</version> <executions> <execution> <id>unpack</id> <!-- executed just before the package phase --> <phase>prepare-package</phase> <goals> <goal>unpack</goal> </goals> <configuration> <artifactItems> <!-- For Flink connector classes --> <artifactItem> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-twitter_${scala.version}</artifactId> <version>1.1-SNAPSHOT</version> <type>jar</type> <overWrite>false</overWrite> <outputDirectory>${project.build.directory}/classes</outputDirectory> <includes>org/apache/flink/**</includes> </artifactItem> <artifactItem> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch2_${scala.version}</artifactId> <version>1.1-SNAPSHOT</version> <type>jar</type> <overWrite>false</overWrite> <outputDirectory>${project.build.directory}/classes</outputDirectory> <includes>org/apache/flink/**</includes> </artifactItem> </artifactItems> </configuration> </execution> </executions> </plugin> And when I copy jar to "*flink/build-target/lib" *I don't have problem with "not connected to any elasticsearch node" but I have problem with Noclassfound org.elasticsearch.Client ok so I copy next jar from mvn repository and I have next problem with joda Time :D and the next and the next :D And it's not work :( *So how it's correct way to connect to elasticsearch 2.3.2 in local cluster?* 2016-05-11 20:35 GMT+02:00 Martin Neumann <[email protected]>: > Hi, > > Are you sure the elastic cluster is running correctly? > > Open a browser and try 127.0.0.1:9200 that should give you the overview > of the cluster. If you don't get it there is something wrong with the > setup. Its also a good way to double check the cluster.name (I got that > wrong more than once) > > I used to have some connection problems with older elastic versions (don't > remember which one). I was able to get around it by retrying multiple times. > > cheers Martin > > On Wed, May 11, 2016 at 7:43 PM, Stephan Ewen <[email protected]> wrote: > >> Seeing how you put a loopback address into the transport addresses, are >> you sure that an ElasticSearch node runs on every machine? >> >> On Wed, May 11, 2016 at 7:41 PM, Stephan Ewen <[email protected]> wrote: >> >>> ElasticSearch is basically saying that it cannot connect. >>> >>> Is it possible that the configuration of elastic may be incorrect, or >>> some of the ports may be blocked? >>> >>> >>> On Mon, May 9, 2016 at 7:05 PM, rafal green <[email protected]> >>> wrote: >>> >>>> Dear Sir or Madam, >>>> >>>> Can you tell me why I have a problem with elasticsearch in local >>>> cluster? >>>> >>>> I analysed this example: >>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html >>>> >>>> My flink and elasticsearch config are default (only I change node.name to >>>> "node-1") >>>> >>>> This example run on my IntelliJIdea 15 but on local cluster I have a >>>> problem. Of course WordCount and SocketTextStreamWordCount works fine. >>>> >>>> >>>> I spend 2 days to try find solution (With uncle google ;) ) but It's >>>> not easy >>>> >>>> val config = new java.util.HashMap[String, String] >>>> config.put("bulk.flush.max.actions", "1") >>>> config.put("cluster.name", "*elasticsearch*") >>>> config.put("path.home", " >>>> */media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2* >>>> ") >>>> >>>> val transports = new util.ArrayList[InetSocketAddress] >>>> transports.add(new InetSocketAddress(InetAddress.getByName("*127.0.0.1* >>>> "),9300)) >>>> >>>> >>>> >>>> Error output: >>>> >>>> java.lang.RuntimeException: *Client is not connected to any >>>> Elasticsearch nodes!* >>>> at >>>> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) >>>> at >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> 05/08/2016 22:57:02 Job execution switched to status FAILING. >>>> java.lang.RuntimeException: Client is not connected to any >>>> Elasticsearch nodes! >>>> at >>>> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) >>>> at >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) >>>> at java.lang.Thread.run(Thread.java:745) >>>> 05/08/2016 22:57:02 Job execution switched to status FAILED. >>>> >>>> ------------------------------------------------------------ >>>> The program finished with the following exception: >>>> >>>> org.apache.flink.client.program.ProgramInvocationException: The program >>>> execution failed: Job execution failed. >>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381) >>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) >>>> at >>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541) >>>> at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69) >>>> at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>> at >>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860) >>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327) >>>> at >>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187) >>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238) >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>>> execution failed. >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753) >>>> at >>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >>>> at >>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: java.lang.RuntimeException: Client is not connected to any >>>> Elasticsearch nodes! >>>> at >>>> org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) >>>> at >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> >>>> Best regards, >>>> Rafal Greeny >>>> >>> >>> >> >
