Hi Stefan

Please find below stack trace and code :

java.lang.IllegalStateException: Job 81ca41b13e7be8feb99f064e5a9a4237 not
found
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1470)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:684)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:123)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Code : 

class Aggregator(val stream: KeyedStream[Record, Long]) extends Serializable
{

  def reduceFunction = new ReduceFunction[Record] {
    override def reduce(t: Record, t1: Record): Record = {
      val total = t + t1
      total
    }
  }

  val reducingStateDesc = new ReducingStateDescriptor[Record]("record
reducing descriptor", reduceFunction, classOf[Record])
//  reducingStateDesc.setQueryable("queryStore")

  def reduceToQueryable = {
    stream.asQueryableState("queryStore", reducingStateDesc)
  }
}

class FlinkQuery[T](jobID: JobID, val serializer:
TypeSerializer[T],jobManagerIP:String, jobManagerPort:Int) extends
Serializable with LazyLogging {

  @Transient
  private lazy val client = new QueryableStateClient(config)

 val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
    kvState.onFailure(onFailure)
    kvState.onSuccess(onSuccess)

 def onFailure = new PartialFunction[Throwable, String] {
    override def isDefinedAt(x: Throwable): Boolean = true
    override def apply(v1: Throwable): String = {
      logger.error("failed to query " + v1.getLocalizedMessage)
    }
  }


  def onSuccess = new PartialFunction[Array[Byte], Array[Byte]] {
    override def isDefinedAt(x: Array[Byte]): Boolean = x != Nil

    override def apply(v1: Array[Byte]) = {
      logger.error("got result " + v1);
      v1
    }
  }
}

class Driver {
    val jobID = env.getStreamGraph.getJobGraph.getJobID
    val aggregatedNQueryable = driver.aggregateWithQueryable(stream)
    val queryStoreName =  aggregatedNQueryable.getQueryableStateName
    val serializer =    aggregatedNQueryable.getKeySerializer
     val valueSerializer =    aggregatedNQueryable.getValueSerialize
}





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Not-able-to-query-Queryable-State-tp8808p8938.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to