This should be rather easy to add with the latest addition of the ActorGateway and the message decoration.
On Fri, Jul 17, 2015 at 5:04 PM, Stephan Ewen <se...@apache.org> wrote: > Seems that version mismatches are one of the most common sources of > issues... > > Maybe we should think about putting a version number into the messages (at > least between client and JobManager) and fail fast on version mismatches... > > On Thu, Jul 16, 2015 at 5:56 PM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Good to hear that your problem is solved :-) >> >> Cheers, >> Till >> >> On Thu, Jul 16, 2015 at 5:45 PM, Philipp Goetze < >> philipp.goe...@tu-ilmenau.de> wrote: >> >>> Hi Till, >>> >>> many thanks for your effort. I finally got it working. >>> >>> I'm a bit embarrassed because the issue was solved by using the same >>> flink-dist-JAR from the locally running Flink version. So to say I used an >>> older Snapshot version for compiling than for running :-[ >>> >>> Best Regards, >>> Philipp >>> >>> >>> On 16.07.2015 17:35, Till Rohrmann wrote: >>> >>> Hi Philipp, >>> >>> what I usually do to run a Flink program on a cluster from within my >>> IDE, I create a RemoteExecutionEnvironment. Since I have one UDF (the >>> map function which doubles the values) defined, I also need to specify the >>> jar containing this class. In my case, the jar is called >>> test-1.0-SNAPSHOT.jar. >>> >>> def main(args: Array[String]) { >>> // set up the execution environment >>> val env = ExecutionEnvironment.createRemoteEnvironment("localhost", >>> 6123, "target/test-1.0-SNAPSHOT.jar") >>> >>> val elements = env.fromElements(1,2,3,4,5) >>> >>> val doubled = elements.map(x => 2*x) >>> >>> doubled.printOnTaskManager("TaskManager") >>> >>> // execute program >>> env.execute("Flink Scala API Skeleton") >>> } >>> >>> But I also tried your approach of how to submit jobs to Flink and it >>> worked for me as well. Therefore, I guess that there is something wrong >>> with your job. What happens in PigStorage().load? >>> >>> Cheers, >>> Till >>> >>> >>> On Thu, Jul 16, 2015 at 4:35 PM, Philipp Goetze < >>> philipp.goe...@tu-ilmenau.de> wrote: >>> >>>> Hey Tim, >>>> >>>> I think my previous mail was intercepted or something similar. However >>>> you can find my reply below. I already tried a simpler job which just does >>>> a env.fromElements... but still the same stack. >>>> >>>> How do you normally submit jobs (jars) from within the code? >>>> >>>> Best Regards, >>>> Philipp >>>> >>>> >>>> -------- Forwarded Message -------- Subject: Re: Submitting jobs from >>>> within Scala code Date: Thu, 16 Jul 2015 14:31:01 +0200 From: Philipp >>>> Goetze <philipp.goe...@tu-ilmenau.de> <philipp.goe...@tu-ilmenau.de> To: >>>> user@flink.apache.org >>>> >>>> Hey, >>>> >>>> from the JobManager I do not get any more hints: >>>> >>>> 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist >>>> - Received message RequestJobCounts at akka://flink/user/archive >>>> from Actor[akka://flink/temp/$gc]. >>>> 13:36:06,674 DEBUG org.apache.flink.runtime.jobmanager.MemoryArchivist >>>> - Handled message RequestJobCounts in 0 ms from >>>> Actor[akka://flink/temp/$gc]. >>>> 13:36:06,674 DEBUG org.eclipse.jetty.util.log >>>> - RESPONSE /jobsInfo 200 >>>> 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager >>>> - Received message RequestBlobManagerPort at >>>> akka://flink/user/jobmanager from >>>> Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b]. >>>> 13:36:06,965 DEBUG org.apache.flink.runtime.jobmanager.JobManager >>>> - Handled message RequestBlobManagerPort in 0 ms from >>>> Actor[akka.tcp://flink@127.0.0.1:43640/temp/$b]. >>>> 13:36:06,984 DEBUG org.apache.flink.runtime.blob.BlobServerConnection >>>> - Received PUT request for content addressable BLOB >>>> 13:36:07,086 DEBUG org.apache.flink.runtime.jobmanager.JobManager >>>> - Received message >>>> SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) at >>>> akka://flink/user/jobmanager from >>>> Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443]. >>>> 13:36:07,087 INFO org.apache.flink.runtime.jobmanager.JobManager >>>> - Received job 146bc2162de9353bbd457a74eda59ae3 (Starting Query). >>>> 13:36:07,087 DEBUG org.apache.flink.runtime.jobmanager.JobManager >>>> - Running initialization on master for job >>>> 146bc2162de9353bbd457a74eda59ae3 (Starting Query). >>>> 13:36:07,087 ERROR org.apache.flink.runtime.jobmanager.JobManager >>>> - Failed to submit job 146bc2162de9353bbd457a74eda59ae3 (Starting >>>> Query) >>>> org.apache.flink.runtime.client.JobSubmissionException: The vertex null >>>> (null) has no invokable class. >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:511) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$4.apply(JobManager.scala:507) >>>> at scala.collection.Iterator$class.foreach(Iterator.scala:743) >>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1195) >>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:507) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) >>>> at >>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >>>> at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) >>>> at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>> at >>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>> at >>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> 13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph >>>> - Starting Query switched from CREATED to FAILING. >>>> 13:36:07,088 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph >>>> - Starting Query switched from FAILING to FAILED. >>>> 13:36:07,088 DEBUG org.apache.flink.runtime.jobmanager.JobManager >>>> - Handled message >>>> SubmitJob(org.apache.flink.runtime.jobgraph.JobGraph@18746a85,true) in 1 >>>> ms from Actor[akka.tcp://flink@127.0.0.1:43640/user/$a#224238443]. >>>> 13:36:07,524 DEBUG Remoting >>>> - Remote system with address [akka.tcp://flink@127.0.0.1:43640] has >>>> shut down. Address is now gated for 5000 ms, all messages to this address >>>> will be delivered to dead letters. >>>> >>>> >>>> >>>> >>>> The code of the job is quite simple (just a test-case). As stated >>>> before it works when using the wrapper script and the web client. I think >>>> something is wrong in the submitJar method I posted earlier. But here the >>>> code of the submitted job: >>>> >>>> import org.apache.flink.api.scala._ >>>> >>>> import dbis.flink._ >>>> >>>> object load { >>>> >>>> def tupleAToString(t: List[Any]): String = { >>>> >>>> implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]] >>>> >>>> val sb = new StringBuilder >>>> >>>> sb.append(t(0)) >>>> >>>> sb.toString >>>> >>>> } >>>> >>>> def main(args: Array[String]) { >>>> >>>> val env = ExecutionEnvironment.getExecutionEnvironment >>>> >>>> val A = PigStorage().load(env, >>>> "/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/src/it/resources/file.txt", >>>> '\t') >>>> >>>> A.map(t => >>>> tupleAToString(t)).writeAsText("/home/blaze/Documents/TU_Ilmenau/Masterthesis/projects/pigspark/result1.out") >>>> >>>> env.execute("Starting Query") >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> Best Regards, >>>> Philipp >>>> >>> >>> >>> >> >