Hi All
I am trying to get JOB accumulators. ( I am aware that I can get the accumulators through REST APIs as well, but i wanted to avoid JSON parsing). Looking at JobAccumulatorsHandler i am trying to get execution graph for currently running job. Following is my code: InetSocketAddress initialJobManagerAddress=new InetSocketAddress(hostName,port); InetAddress ownHostname; ownHostname= ConnectionUtils.findConnectingAddress(initialJobManagerAddress,2000,400); ActorSystem actorSystem= AkkaUtils.createActorSystem(configuration, new Some(new Tuple2<String,Object>(ownHostname.getCanonicalHostName(),0))); FiniteDuration timeout= FiniteDuration.apply(10, TimeUnit.SECONDS); ActorGateway akkaActorGateway= LeaderRetrievalUtils.retrieveLeaderGateway( LeaderRetrievalUtils.createLeaderRetrievalService(configuration), actorSystem,timeout ); Future<Object> future=akkaActorGateway.ask(new RequestJobDetails(true,false),timeout); MultipleJobsDetails result=(MultipleJobsDetails) Await.result(future,timeout); ExecutionGraphHolder executionGraphHolder=new ExecutionGraphHolder(timeout); LOG.info(result.toString()); for(JobDetails detail:result.getRunningJobs()){ LOG.info(detail.getJobName() + " ID " + detail.getJobId()); * ExecutionGraph executionGraph=executionGraphHolder.getExecutionGraph(detail.getJobId(),akkaActorGateway);* LOG.info("Accumulators " + executionGraph.aggregateUserAccumulators()); } However, i am receiving following error in Flink: 2016-09-20T14:19:53,201 [flink-akka.actor.default-dispatcher-3] nobody ERROR akka.remote.EndpointWriter - Transient association error (association remains live) java.io.NotSerializableException: org.apache.flink.runtime.checkpoint.CheckpointCoordinator at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) ~[?:1.8.0_92] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[?:1.8.0_92] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) ~[?:1.8.0_92] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[?:1.8.0_92] at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?] at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?] at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[scala-library-2.10.5.jar:?] at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) ~[akka-actor_2.10-2.3.7.jar:?] at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) ~[akka-remote_2.10-2.3.7.jar:?] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) ~[akka-remote_2.10-2.3.7.jar:?] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) ~[akka-remote_2.10-2.3.7.jar:?] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[scala-library-2.10.5.jar:?] at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844) ~[akka-remote_2.10-2.3.7.jar:?] Any reason why its failing? This code works when invoked through WebRuntimeMonitor. Regards Sumit Chawla