Hi, I am running a Flink Job which uses the Queryable State feature of Apache Flink(1.3.2). I was able to do that in local mode. When I try to do that in Cluster mode (Yarn Session), I am getting Actor not found Exception.
Please help me to understand what is missing. *Exception Trace* Query failed because of the following Exception: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka. tcp://flink@my-machine:52650/), Path(/user/jobmanager)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply( ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply( ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch( BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$Batch.run( BatchingExecutor.scala:73) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$. unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor. scala:120) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$. execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise. scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete( Promise.scala:248) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp( AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$. scala$concurrent$Future$InternalCallbackExecutor$$ unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$. execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask( Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1( Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$ anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$ anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:745) *Client Creation Snippet * * Configuration config = new Configuration();* * config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);* * config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);* * final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils* * .createHighAvailabilityServices(config, Executors.newSingleThreadScheduledExecutor(),* * HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);* * this.client = new QueryableStateClient(config, highAvailabilityServices);* * }*