here my main class: public static void main(String[] args) { //load properties Properties pro = new Properties(); try { pro.load(FlinkMain.class.getResourceAsStream("/config.properties")); } catch (Exception e) { e.printStackTrace(); } int maxIteration = Integer.parseInt(pro.getProperty("maxiterations")); String outputPath = pro.getProperty("flink.output"); // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input points DataSet<GeoTimeDataTupel> points = getPointDataSet(env); DataSet<GeoTimeDataCenter> centroids = null; try { centroids = getCentroidDataSet(env); } catch (Exception e1) { e1.printStackTrace(); } // set number of bulk iterations for KMeans algorithm IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration); DataSet<GeoTimeDataCenter> newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids") // count and sum point coordinates for each centroid .groupBy(0).reduceGroup(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager()); // feed new centroids back into next iteration DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids); DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); // emit result clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); finalCentroids.writeAsText(outputPath+"/centers");//print(); // execute program try { env.execute("KMeans Flink"); } catch (Exception e) { e.printStackTrace(); } }
maybe i can't use the following for the hdfs? clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); finalCentroids.writeAsText(outputPath+"/centers");//print(); 2015-06-04 17:53 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: > i have change the permissions from the cloudera user and try the following > command. > and the files exist on hdfs ;) i set the files in my properties file like > "flink.output=/user/cloudera/outputs/output_flink" > i get the same exception again, maybe the problem have an other reason? > > [cloudera@quickstart bin]$ sudo su yarn > bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs > bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs > bash-4.1$ exit > exit > [cloudera@quickstart bin]$ sudo ./flink run > /home/cloudera/Desktop/ma-flink.jar > log4j:WARN No appenders could be found for logger > (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). > log4j:WARN Please initialize the log4j system properly. > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for > more info. > Found YARN properties file > /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties > Using JobManager address from YARN properties quickstart.cloudera/ > 127.0.0.1:52601 > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Failed to submit job fe78e9ee50cf76ac8b487919e1c951fa > (KMeans Flink) > at org.apache.flink.client.program.Client.run(Client.java:412) > at org.apache.flink.client.program.Client.run(Client.java:355) > at org.apache.flink.client.program.Client.run(Client.java:348) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) > at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > at org.apache.flink.client.program.Client.run(Client.java:315) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed > to submit job fe78e9ee50cf76ac8b487919e1c951fa (KMeans Flink) > > at org.apache.flink.runtime.jobmanager.JobManager.org > $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.JobException: Creating the input > splits caused an error: File /user/cloudera/inputs does not exist or the > user running Flink ('yarn') has insufficient permissions to access it. > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471) > at org.apache.flink.runtime.jobmanager.JobManager.org > $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535) > ... 21 more > Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs does > not exist or the user running Flink ('yarn') has insufficient permissions > to access it. > at > org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106) > at > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390) > at > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146) > ... 23 more > > > 2015-06-04 17:38 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: > >> sorry, i see my yarn end before i can run my app, i must set the write >> access for yarn, maybe this solve my problem. >> >> 2015-06-04 17:33 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: >> >>> i start the yarn-session.sh with sudo >>> and than the flink run command with sudo, >>> i get the following exception: >>> >>> cloudera@quickstart bin]$ sudo ./flink run >>> /home/cloudera/Desktop/ma-flink.jar >>> log4j:WARN No appenders could be found for logger >>> (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). >>> log4j:WARN Please initialize the log4j system properly. >>> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig >>> for more info. >>> org.apache.flink.client.program.ProgramInvocationException: Failed to >>> resolve JobManager >>> at org.apache.flink.client.program.Client.run(Client.java:378) >>> at org.apache.flink.client.program.Client.run(Client.java:355) >>> at org.apache.flink.client.program.Client.run(Client.java:348) >>> at >>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) >>> at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) >>> at org.apache.flink.client.program.Client.run(Client.java:315) >>> at >>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584) >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290) >>> at >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880) >>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922) >>> Caused by: java.io.IOException: JobManager at akka.tcp:// >>> flink@127.0.0.1:6123/user/jobmanager not reachable. Please make sure >>> that the JobManager is running and its port is reachable. >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1198) >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1222) >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1240) >>> at >>> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala) >>> at org.apache.flink.client.program.Client.run(Client.java:375) >>> ... 15 more >>> Caused by: akka.actor.ActorNotFound: Actor not found for: >>> ActorSelection[Anchor(akka.tcp://flink@127.0.0.1:6123/), >>> Path(/user/jobmanager)] >>> at >>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) >>> at >>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) >>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >>> at >>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) >>> at >>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) >>> at >>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) >>> at >>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) >>> at >>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>> at >>> akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) >>> at >>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) >>> at >>> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) >>> at >>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) >>> at >>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) >>> at >>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) >>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) >>> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) >>> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) >>> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) >>> at >>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) >>> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561) >>> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) >>> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415) >>> at >>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) >>> at >>> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) >>> at akka.actor.ActorCell.terminate(ActorCell.scala:369) >>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) >>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) >>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> the FlinkMain.java: 70 is: >>> >>> env.execute("KMeans Flink"); >>> >>> >>> 2015-06-04 17:17 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: >>> >>>> i try this: >>>> >>>> [cloudera@quickstart bin]$ sudo su yarn >>>> bash-4.1$ hadoop fs -chmod 777 /user/cloudera/outputs >>>> chmod: changing permissions of '/user/cloudera/outputs': Permission >>>> denied. user=yarn is not the owner of inode=outputs >>>> bash-4.1$ hadoop fs -chmod 777 /user/cloudera/inputs >>>> chmod: changing permissions of '/user/cloudera/inputs': Permission >>>> denied. user=yarn is not the owner of inode=inputs >>>> bash-4.1$ exit >>>> exit >>>> [cloudera@quickstart bin]$ sudo ./flink run >>>> /home/cloudera/Desktop/ma-flink.jar >>>> log4j:WARN No appenders could be found for logger >>>> (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). >>>> log4j:WARN Please initialize the log4j system properly. >>>> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig >>>> for more info. >>>> Found YARN properties file >>>> /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties >>>> Using JobManager address from YARN properties quickstart.cloudera/ >>>> 127.0.0.1:53874 >>>> org.apache.flink.client.program.ProgramInvocationException: The program >>>> execution failed: Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 >>>> (KMeans Flink) >>>> at org.apache.flink.client.program.Client.run(Client.java:412) >>>> at org.apache.flink.client.program.Client.run(Client.java:355) >>>> at org.apache.flink.client.program.Client.run(Client.java:348) >>>> at >>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) >>>> at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) >>>> at >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) >>>> at org.apache.flink.client.program.Client.run(Client.java:315) >>>> at >>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584) >>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290) >>>> at >>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880) >>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922) >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>> Failed to submit job 2f46ef5dff4ecf5552b3477ed1c6f4b9 (KMeans Flink) >>>> at org.apache.flink.runtime.jobmanager.JobManager.org >>>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192) >>>> at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >>>> at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >>>> at >>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >>>> at >>>> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99) >>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) >>>> at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) >>>> at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >>>> at >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >>>> at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: org.apache.flink.runtime.JobException: Creating the input >>>> splits caused an error: File /user/cloudera/inputs does not exist or the >>>> user running Flink ('yarn') has insufficient permissions to access it. >>>> at >>>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162) >>>> at >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471) >>>> at org.apache.flink.runtime.jobmanager.JobManager.org >>>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535) >>>> ... 21 more >>>> Caused by: java.io.FileNotFoundException: File /user/cloudera/inputs >>>> does not exist or the user running Flink ('yarn') has insufficient >>>> permissions to access it. >>>> at >>>> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106) >>>> at >>>> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390) >>>> at >>>> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51) >>>> at >>>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146) >>>> ... 23 more >>>> >>>> >>>> 2015-06-04 17:15 GMT+02:00 Robert Metzger <rmetz...@apache.org>: >>>> >>>>> As the output of the "hadoop" tool indicates, it expects two >>>>> arguments, you only passed one (777). >>>>> The second argument it is expecting is the path to the file you want >>>>> to change. >>>>> >>>>> In your case, it is: >>>>> hadoop fs -chmod 777 /user/cloudera/outputs >>>>> >>>>> >>>>> The reason why >>>>> hadoop fs -chmod 777 * >>>>> does not work is the following: the * is evaluated by your local bash >>>>> and expanded to the files which are present in your current, local >>>>> directory. The bash expansion is not able to expand to the files in HDFS. >>>>> >>>>> >>>>> On Thu, Jun 4, 2015 at 5:08 PM, Pa Rö <paul.roewer1...@googlemail.com> >>>>> wrote: >>>>> >>>>>> [cloudera@quickstart bin]$ sudo su yarn >>>>>> bash-4.1$ hadoop fs -chmod 777 >>>>>> -chmod: Not enough arguments: expected 2 but got 1 >>>>>> Usage: hadoop fs [generic options] -chmod [-R] <MODE[,MODE]... | >>>>>> OCTALMODE> PATH... >>>>>> bash-4.1$ >>>>>> >>>>>> you understand? >>>>>> >>>>>> 2015-06-04 17:04 GMT+02:00 Robert Metzger <rmetz...@apache.org>: >>>>>> >>>>>>> It looks like the user "yarn" which is running Flink doesn't have >>>>>>> permission to access the files. >>>>>>> >>>>>>> Can you do "sudo su yarn" to become the "yarn" user. Then, you can >>>>>>> do "hadoop fs -chmod 777" to make the files accessible for everyone. >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 4, 2015 at 4:59 PM, Pa Rö < >>>>>>> paul.roewer1...@googlemail.com> wrote: >>>>>>> >>>>>>>> okay, it's work, i get a exception: >>>>>>>> >>>>>>>> [cloudera@quickstart Desktop]$ cd flink-0.9-SNAPSHOT/bin/ >>>>>>>> [cloudera@quickstart bin]$ flink run >>>>>>>> /home/cloudera/Desktop/ma-flink.jar >>>>>>>> bash: flink: command not found >>>>>>>> [cloudera@quickstart bin]$ ./flink run >>>>>>>> /home/cloudera/Desktop/ma-flink.jar >>>>>>>> log4j:WARN No appenders could be found for logger >>>>>>>> (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). >>>>>>>> log4j:WARN Please initialize the log4j system properly. >>>>>>>> log4j:WARN See >>>>>>>> http://logging.apache.org/log4j/1.2/faq.html#noconfig for more >>>>>>>> info. >>>>>>>> Found YARN properties file >>>>>>>> /home/cloudera/Desktop/flink-0.9-SNAPSHOT/bin/../conf/.yarn-properties >>>>>>>> Using JobManager address from YARN properties quickstart.cloudera/ >>>>>>>> 127.0.0.1:53874 >>>>>>>> java.io.IOException: Mkdirs failed to create /user/cloudera/outputs >>>>>>>> at >>>>>>>> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438) >>>>>>>> at >>>>>>>> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) >>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905) >>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886) >>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783) >>>>>>>> at >>>>>>>> mgm.tp.bigdata.ma_commons.commons.Seeding.randomSeeding(Seeding.java:21) >>>>>>>> at >>>>>>>> mgm.tp.bigdata.ma_flink.FlinkMain.getCentroidDataSet(FlinkMain.java:178) >>>>>>>> at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:47) >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>> at >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>>>> at >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) >>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:315) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584) >>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:922) >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>>>> program execution failed: Failed to submit job >>>>>>>> 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink) >>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:412) >>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:355) >>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:348) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) >>>>>>>> at mgm.tp.bigdata.ma_flink.FlinkMain.main(FlinkMain.java:70) >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>> at >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>>>> at >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) >>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:315) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584) >>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:880) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:922) >>>>>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>>>>>> Failed to submit job 934743a5c49c6d5e31c9e8201452e36d (KMeans Flink) >>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org >>>>>>>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:595) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:192) >>>>>>>> at >>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >>>>>>>> at >>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >>>>>>>> at >>>>>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >>>>>>>> at >>>>>>>> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:99) >>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >>>>>>>> at >>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94) >>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>> at >>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>> Caused by: org.apache.flink.runtime.JobException: Creating the >>>>>>>> input splits caused an error: File /user/cloudera/outputs/seed-1 does >>>>>>>> not >>>>>>>> exist or the user running Flink ('yarn') has insufficient permissions >>>>>>>> to >>>>>>>> access it. >>>>>>>> at >>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:471) >>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org >>>>>>>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:535) >>>>>>>> ... 21 more >>>>>>>> Caused by: java.io.FileNotFoundException: File >>>>>>>> /user/cloudera/outputs/seed-1 does not exist or the user running Flink >>>>>>>> ('yarn') has insufficient permissions to access it. >>>>>>>> at >>>>>>>> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:106) >>>>>>>> at >>>>>>>> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:390) >>>>>>>> at >>>>>>>> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:51) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146) >>>>>>>> ... 23 more >>>>>>>> >>>>>>>> >>>>>>>> how i must set the files in the hdfs? >>>>>>>> quickstart.cloudera:50075/home/cloudera/output? >>>>>>>> >>>>>>>> 2015-06-04 16:51 GMT+02:00 Robert Metzger <rmetz...@apache.org>: >>>>>>>> >>>>>>>>> Once you've started the YARN session, you can submit a Flink job >>>>>>>>> with "./bin/flink run <pathToYourJar>". >>>>>>>>> >>>>>>>>> The jar file of your job doesn't need to be in HDFS. It has to be >>>>>>>>> in the local file system and flink will send it to all machines. >>>>>>>>> >>>>>>>>> On Thu, Jun 4, 2015 at 4:48 PM, Pa Rö < >>>>>>>>> paul.roewer1...@googlemail.com> wrote: >>>>>>>>> >>>>>>>>>> okay, now it run on my hadoop. >>>>>>>>>> how i can start my flink job? and where must the jar file save, >>>>>>>>>> at hdfs or as local file? >>>>>>>>>> >>>>>>>>>> 2015-06-04 16:31 GMT+02:00 Robert Metzger <rmetz...@apache.org>: >>>>>>>>>> >>>>>>>>>>> Yes, you have to run these commands in the command line of the >>>>>>>>>>> Cloudera VM. >>>>>>>>>>> >>>>>>>>>>> On Thu, Jun 4, 2015 at 4:28 PM, Pa Rö < >>>>>>>>>>> paul.roewer1...@googlemail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> you mean run this command on terminal/shell and not define a >>>>>>>>>>>> hue job? >>>>>>>>>>>> >>>>>>>>>>>> 2015-06-04 16:25 GMT+02:00 Robert Metzger <rmetz...@apache.org> >>>>>>>>>>>> : >>>>>>>>>>>> >>>>>>>>>>>>> It should be certainly possible to run Flink on a cloudera >>>>>>>>>>>>> live VM >>>>>>>>>>>>> >>>>>>>>>>>>> I think these are the commands you need to execute: >>>>>>>>>>>>> >>>>>>>>>>>>> wget >>>>>>>>>>>>> http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz >>>>>>>>>>>>> tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgz >>>>>>>>>>>>> cd flink-0.9-SNAPSHOT/ >>>>>>>>>>>>> *export HADOOP_CONF_DIR=/usr/lib/hadoop/etc/hadoop/* >>>>>>>>>>>>> ./bin/yarn-session.sh -n 1 -jm 1024 -tm 1024 >>>>>>>>>>>>> >>>>>>>>>>>>> If that is not working for you, please post the exact error >>>>>>>>>>>>> message you are getting and I can help you to get it to run. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jun 4, 2015 at 4:18 PM, Pa Rö < >>>>>>>>>>>>> paul.roewer1...@googlemail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> hi robert, >>>>>>>>>>>>>> >>>>>>>>>>>>>> i think the problem is the hue api, >>>>>>>>>>>>>> i had the same problem with spark submit script, >>>>>>>>>>>>>> but on the new hue release, they have a spark submit api. >>>>>>>>>>>>>> >>>>>>>>>>>>>> i asked the group for the same problem with spark, no reply. >>>>>>>>>>>>>> >>>>>>>>>>>>>> i want test my app on local cluster, before i run it on the >>>>>>>>>>>>>> big cluster, >>>>>>>>>>>>>> for that i use cloudera live. maybe it give an other way to >>>>>>>>>>>>>> test flink on a local cluster vm? >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2015-06-04 16:12 GMT+02:00 Robert Metzger < >>>>>>>>>>>>>> rmetz...@apache.org>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Paul, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> why did running Flink from the regular scripts not work for >>>>>>>>>>>>>>> you? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I'm not an expert on Hue, I would recommend asking in the >>>>>>>>>>>>>>> Hue user forum / mailing list: >>>>>>>>>>>>>>> https://groups.google.com/a/cloudera.org/forum/#!forum/hue-user >>>>>>>>>>>>>>> . >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Jun 4, 2015 at 4:09 PM, Pa Rö < >>>>>>>>>>>>>>> paul.roewer1...@googlemail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> thanks, >>>>>>>>>>>>>>>> now i want run my app on cloudera live vm single node, >>>>>>>>>>>>>>>> how i can define my flink job with hue? >>>>>>>>>>>>>>>> i try to run the flink script in the hdfs, it's not work. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> best regards, >>>>>>>>>>>>>>>> paul >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2015-06-02 14:50 GMT+02:00 Robert Metzger < >>>>>>>>>>>>>>>> rmetz...@apache.org>: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I would recommend using HDFS. >>>>>>>>>>>>>>>>> For that, you need to specify the paths like this: >>>>>>>>>>>>>>>>> hdfs:///path/to/data. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Jun 2, 2015 at 2:48 PM, Pa Rö < >>>>>>>>>>>>>>>>> paul.roewer1...@googlemail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> nice, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> which file system i must use for the cluster? java.io or >>>>>>>>>>>>>>>>>> hadoop.fs or flink? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2015-06-02 14:29 GMT+02:00 Robert Metzger < >>>>>>>>>>>>>>>>>> rmetz...@apache.org>: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>> you can start Flink on YARN on the Cloudera distribution. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> See here for more: >>>>>>>>>>>>>>>>>>> http://ci.apache.org/projects/flink/flink-docs-master/setup/yarn_setup.html >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> These are the commands you need to execute >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wget >>>>>>>>>>>>>>>>>>> http://stratosphere-bin.s3-website-us-east-1.amazonaws.com/flink-0.9-SNAPSHOT-bin-hadoop2.tgz >>>>>>>>>>>>>>>>>>> tar xvzf flink-0.9-SNAPSHOT-bin-hadoop2.tgzcd >>>>>>>>>>>>>>>>>>> flink-0.9-SNAPSHOT/ >>>>>>>>>>>>>>>>>>> ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Jun 2, 2015 at 2:03 PM, Pa Rö < >>>>>>>>>>>>>>>>>>> paul.roewer1...@googlemail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> hi community, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> i want test my flink k-means on a hadoop cluster. i use >>>>>>>>>>>>>>>>>>>> the cloudera live distribution. how i can run flink on >>>>>>>>>>>>>>>>>>>> this cluster? maybe >>>>>>>>>>>>>>>>>>>> only the java dependencies are engouth? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> best regards, >>>>>>>>>>>>>>>>>>>> paul >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >