Hi Stefan Please find below stack trace and code :
java.lang.IllegalStateException: Job 81ca41b13e7be8feb99f064e5a9a4237 not found at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1470) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:684) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:123) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Code : class Aggregator(val stream: KeyedStream[Record, Long]) extends Serializable { def reduceFunction = new ReduceFunction[Record] { override def reduce(t: Record, t1: Record): Record = { val total = t + t1 total } } val reducingStateDesc = new ReducingStateDescriptor[Record]("record reducing descriptor", reduceFunction, classOf[Record]) // reducingStateDesc.setQueryable("queryStore") def reduceToQueryable = { stream.asQueryableState("queryStore", reducingStateDesc) } } class FlinkQuery[T](jobID: JobID, val serializer: TypeSerializer[T],jobManagerIP:String, jobManagerPort:Int) extends Serializable with LazyLogging { @Transient private lazy val client = new QueryableStateClient(config) val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey) kvState.onFailure(onFailure) kvState.onSuccess(onSuccess) def onFailure = new PartialFunction[Throwable, String] { override def isDefinedAt(x: Throwable): Boolean = true override def apply(v1: Throwable): String = { logger.error("failed to query " + v1.getLocalizedMessage) } } def onSuccess = new PartialFunction[Array[Byte], Array[Byte]] { override def isDefinedAt(x: Array[Byte]): Boolean = x != Nil override def apply(v1: Array[Byte]) = { logger.error("got result " + v1); v1 } } } class Driver { val jobID = env.getStreamGraph.getJobGraph.getJobID val aggregatedNQueryable = driver.aggregateWithQueryable(stream) val queryStoreName = aggregatedNQueryable.getQueryableStateName val serializer = aggregatedNQueryable.getKeySerializer val valueSerializer = aggregatedNQueryable.getValueSerialize } -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Not-able-to-query-Queryable-State-tp8808p8938.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.