HI Aljoscha I was able to get the ClusterClient and Accumulators using following:
DefaultCLI defaultCLI = new DefaultCLI(); CommandLine line = new DefaultParser().parse(new Options(), new String[]{}, true); ClusterClient clusterClient = defaultCLI.retrieveCluster(line,configuration); Regards Sumit Chawla On Thu, Sep 22, 2016 at 4:55 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > there is ClusterClient.getAccumulators(JobID jobID) which should be able > to > get the accumulators for a running job. If you can construct a > ClusterClient that should be a good solution. > > Cheers, > Aljoscha > > On Wed, 21 Sep 2016 at 21:15 Chawla,Sumit <sumitkcha...@gmail.com> wrote: > > > Hi Sean > > > > My goal here is to get User Accumulators. I know there exists the REST > > Calls. But since i am running my code in the same JVM, i wanted to avoid > > go over HTTP. I saw this code in JobAccumulatorsHandler and tried to use > > this. Would you suggest some alternative approach to avoid this over the > > network serialization for Akka? > > > > Regards > > Sumit Chawla > > > > > > On Wed, Sep 21, 2016 at 11:37 AM, Stephan Ewen <se...@apache.org> wrote: > > > > > Between two different actor systems in the same JVM, messages are still > > > serialized (they go through a local socket, I think). > > > > > > Getting the execution graph is not easily possible, and not intended, > as > > it > > > actually contains RPC resources, etc. > > > > > > What do you need from the execution graph? Maybe there is another way > to > > > achieve that... > > > > > > On Wed, Sep 21, 2016 at 8:08 PM, Chawla,Sumit <sumitkcha...@gmail.com> > > > wrote: > > > > > > > Hi Chesney > > > > > > > > I am actually running this code in the same JVM as the WebInterface > and > > > > JobManager. I am programmatically, starting the JobManager. and > then > > > > running this code in same JVM to query metrics. Only difference > could > > be > > > > that i am creating a new Akka ActorSystem, and ActorGateway. Not sure > > if > > > it > > > > forces it to execute the code as if request is coming over the > wire. I > > > am > > > > not very well aware of Akka internals, so may be somebody can shed > some > > > > light on it. > > > > > > > > Regards > > > > Sumit Chawla > > > > > > > > > > > > On Wed, Sep 21, 2016 at 1:06 AM, Chesnay Schepler < > ches...@apache.org> > > > > wrote: > > > > > > > > > Hello, > > > > > > > > > > this is a rather subtle issue you stumbled upon here. > > > > > > > > > > The ExecutionGraph is not serializable. The only reason why the > > > > > WebInterface can access it is because it runs in the same JVM as > the > > > > > JobManager. > > > > > > > > > > I'm not sure if there is a way for what you are trying to do. > > > > > > > > > > Regards, > > > > > Chesnay > > > > > > > > > > > > > > > On 21.09.2016 06:11, Chawla,Sumit wrote: > > > > > > > > > >> Hi All > > > > >> > > > > >> > > > > >> I am trying to get JOB accumulators. ( I am aware that I can get > > the > > > > >> accumulators through REST APIs as well, but i wanted to avoid JSON > > > > >> parsing). > > > > >> > > > > >> Looking at JobAccumulatorsHandler i am trying to get execution > graph > > > for > > > > >> currently running job. Following is my code: > > > > >> > > > > >> InetSocketAddress initialJobManagerAddress=new > > > > >> InetSocketAddress(hostName,port); > > > > >> InetAddress ownHostname; > > > > >> ownHostname= > > > > >> ConnectionUtils.findConnectingAddress( > initialJobManagerAddress,2000, > > > > 400); > > > > >> > > > > >> ActorSystem actorSystem= > AkkaUtils.createActorSystem(co > > > > >> nfiguration, > > > > >> new Some(new > > > > >> Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0))); > > > > >> > > > > >> FiniteDuration timeout= FiniteDuration.apply(10, > > > > >> TimeUnit.SECONDS); > > > > >> > > > > >> ActorGateway akkaActorGateway= > > > > >> LeaderRetrievalUtils.retrieveLeaderGateway( > > > > >> > > > > >> LeaderRetrievalUtils.createLeaderRetrievalService(configuration), > > > > >> actorSystem,timeout > > > > >> ); > > > > >> > > > > >> > > > > >> Future<Object> future=akkaActorGateway.ask(new > > > > >> RequestJobDetails(true,false),timeout); > > > > >> > > > > >> MultipleJobsDetails result=(MultipleJobsDetails) > > > > >> Await.result(future,timeout); > > > > >> ExecutionGraphHolder executionGraphHolder=new > > > > >> ExecutionGraphHolder(timeout); > > > > >> LOG.info(result.toString()); > > > > >> for(JobDetails detail:result.getRunningJobs()){ > > > > >> LOG.info(detail.getJobName() + " ID " + > > > > >> detail.getJobId()); > > > > >> > > > > >> * ExecutionGraph > > > > >> executionGraph=executionGraphHolder.getExecutionGraph(detail. > > > > getJobId(), > > > > >> akkaActorGateway);* > > > > >> > > > > >> LOG.info("Accumulators " + > > > > >> executionGraph.aggregateUserAccumulators()); > > > > >> } > > > > >> > > > > >> > > > > >> However, i am receiving following error in Flink: > > > > >> > > > > >> 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] > > > nobody > > > > >> ERROR akka.remote.EndpointWriter - Transient association error > > > > >> (association > > > > >> remains live) > > > > >> java.io.NotSerializableException: org.apache.flink.runtime. > > > checkpoint. > > > > >> CheckpointCoordinator > > > > >> at java.io.ObjectOutputStream.writeObject0( > > > ObjectOutputStream. > > > > >> java:1184) > > > > >> ~[?:1.8.0_92] > > > > >> at java.io.ObjectOutputStream.defaultWriteFields( > > > > ObjectOutputSt > > > > >> ream.java:1548) > > > > >> ~[?:1.8.0_92] > > > > >> at java.io.ObjectOutputStream.writeSerialData( > > > > ObjectOutputStrea > > > > >> m.java:1509) > > > > >> ~[?:1.8.0_92] > > > > >> at java.io.ObjectOutputStream.writeOrdinaryObject( > > > > ObjectOutputS > > > > >> tream.java:1432) > > > > >> ~[?:1.8.0_92] > > > > >> at java.io.ObjectOutputStream.writeObject0( > > > ObjectOutputStream. > > > > >> java:1178) > > > > >> ~[?:1.8.0_92] > > > > >> at java.io.ObjectOutputStream.defaultWriteFields( > > > > ObjectOutputSt > > > > >> ream.java:1548) > > > > >> ~[?:1.8.0_92] > > > > >> at java.io.ObjectOutputStream.writeSerialData( > > > > ObjectOutputStrea > > > > >> m.java:1509) > > > > >> ~[?:1.8.0_92] > > > > >> at java.io.ObjectOutputStream.writeOrdinaryObject( > > > > ObjectOutputS > > > > >> tream.java:1432) > > > > >> ~[?:1.8.0_92] > > > > >> at java.io.ObjectOutputStream.writeObject0( > > > ObjectOutputStream. > > > > >> java:1178) > > > > >> ~[?:1.8.0_92] > > > > >> at java.io.ObjectOutputStream.writeObject( > > > ObjectOutputStream. > > > > >> java:348) > > > > >> ~[?:1.8.0_92] > > > > >> at akka.serialization.JavaSerializer$$anonfun$ > > > > >> toBinary$1.apply$mcV$sp(Serializer.scala:129) > > > > >> ~[akka-actor_2.10-2.3.7.jar:?] > > > > >> at akka.serialization.JavaSerializer$$anonfun$ > > > > >> toBinary$1.apply(Serializer.scala:129) > > ~[akka-actor_2.10-2.3.7.jar:?] > > > > >> at akka.serialization.JavaSerializer$$anonfun$ > > > > >> toBinary$1.apply(Serializer.scala:129) > > ~[akka-actor_2.10-2.3.7.jar:?] > > > > >> at scala.util.DynamicVariable.withValue(DynamicVariable. > > > scala: > > > > >> 57) > > > > >> ~[scala-library-2.10.5.jar:?] > > > > >> at akka.serialization.JavaSerializer.toBinary( > > > > Serializer.scala: > > > > >> 129) > > > > >> ~[akka-actor_2.10-2.3.7.jar:?] > > > > >> at akka.remote.MessageSerializer$ > > > > .serialize(MessageSerializer.s > > > > >> cala:36) > > > > >> ~[akka-remote_2.10-2.3.7.jar:?] > > > > >> at akka.remote.EndpointWriter$$ > anonfun$serializeMessage$1. > > > > apply > > > > >> (Endpoint.scala:845) > > > > >> ~[akka-remote_2.10-2.3.7.jar:?] > > > > >> at akka.remote.EndpointWriter$$ > anonfun$serializeMessage$1. > > > > apply > > > > >> (Endpoint.scala:845) > > > > >> ~[akka-remote_2.10-2.3.7.jar:?] > > > > >> at scala.util.DynamicVariable.withValue(DynamicVariable. > > > scala: > > > > >> 57) > > > > >> ~[scala-library-2.10.5.jar:?] > > > > >> at akka.remote.EndpointWriter.serializeMessage(Endpoint. > > > scala: > > > > >> 844) > > > > >> ~[akka-remote_2.10-2.3.7.jar:?] > > > > >> > > > > >> Any reason why its failing? This code works when invoked through > > > > >> WebRuntimeMonitor. > > > > >> > > > > >> Regards > > > > >> Sumit Chawla > > > > >> > > > > >> > > > > > > > > > > > > > > >