I think the exception message is saying what’s the problem. The job simply does not exist. You can verify that by running bin/flink list or look it up in the web interface.
The reason is that calling env.getStreamGraph.getJobGraph will generate a new JobGraph (not the one which is sent to the JobManager) and this JobGraph will get a new JobID assigned. Thus, the JobGraph which you send to the JobManager and the one you used to retrieve the JobID from are different. Cheers, Till On Wed, Sep 7, 2016 at 8:07 AM, pushpendra.jaiswal < pushpendra.jaiswa...@gmail.com> wrote: > Hi Stefan > > Please find below stack trace and code : > > java.lang.IllegalStateException: Job 81ca41b13e7be8feb99f064e5a9a4237 not > found > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$ > jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1470) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1. > applyOrElse(JobManager.scala:684) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp( > AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$ > anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp( > AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > AbstractPartialFunction.scala:25) > at org.apache.flink.runtime.LogMessages$$anon$1.apply( > LogMessages.scala:33) > at org.apache.flink.runtime.LogMessages$$anon$1.apply( > LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction. > scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1. > applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager. > scala:123) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec( > ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > > Code : > > class Aggregator(val stream: KeyedStream[Record, Long]) extends > Serializable > { > > def reduceFunction = new ReduceFunction[Record] { > override def reduce(t: Record, t1: Record): Record = { > val total = t + t1 > total > } > } > > val reducingStateDesc = new ReducingStateDescriptor[Record]("record > reducing descriptor", reduceFunction, classOf[Record]) > // reducingStateDesc.setQueryable("queryStore") > > def reduceToQueryable = { > stream.asQueryableState("queryStore", reducingStateDesc) > } > } > > class FlinkQuery[T](jobID: JobID, val serializer: > TypeSerializer[T],jobManagerIP:String, jobManagerPort:Int) extends > Serializable with LazyLogging { > > @Transient > private lazy val client = new QueryableStateClient(config) > > val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey) > kvState.onFailure(onFailure) > kvState.onSuccess(onSuccess) > > def onFailure = new PartialFunction[Throwable, String] { > override def isDefinedAt(x: Throwable): Boolean = true > override def apply(v1: Throwable): String = { > logger.error("failed to query " + v1.getLocalizedMessage) > } > } > > > def onSuccess = new PartialFunction[Array[Byte], Array[Byte]] { > override def isDefinedAt(x: Array[Byte]): Boolean = x != Nil > > override def apply(v1: Array[Byte]) = { > logger.error("got result " + v1); > v1 > } > } > } > > class Driver { > val jobID = env.getStreamGraph.getJobGraph.getJobID > val aggregatedNQueryable = driver.aggregateWithQueryable(stream) > val queryStoreName = aggregatedNQueryable.getQueryableStateName > val serializer = aggregatedNQueryable.getKeySerializer > val valueSerializer = aggregatedNQueryable.getValueSerialize > } > > > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/Fwd-Not-able-to- > query-Queryable-State-tp8808p8938.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >