Hi,
I have set up a flink cluster on my local machine. I created a flink job (
TrackMaximumTemperature) and made the state queryable. I am using
*github/streamingwithflink/chapter7/QueryableState.scala* example from
*https://github.com/streaming-with-flink
<https://github.com/streaming-with-flink>* repository. Please find the
file attached.
Now i have the running job id and when i go and try to access the state, it
throws an exception. I see the job is running and I am using the correct
jobId. Also checkpointing is enabled in the original job and i have set the
properties related to checkpointing in flink-conf.yaml. Am I
missing something? Any leads will be appreciated. Thank you :)
*Exception stack trace:*
Caused by:
org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could
not retrieve location of state=maxTemperature of
job=9a528bf3e1b650aed7e0b1e26d038ad5. Potential reasons are: i) the state
is not ready, or ii) the job does not exist.
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
at
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
package io.github.streamingwithflink.chapter7
import java.util.concurrent.CompletableFuture
import io.github.streamingwithflink.util.{SensorReading, SensorSource,
SensorTimeAssigner}
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.queryablestate.client.QueryableStateClient
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
object TrackMaximumTemperature {
/** main() defines and executes the DataStream program */
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
// checkpoint every 10 seconds
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
env.getConfig.setAutoWatermarkInterval(1000L)
// ingest sensor stream
val sensorData: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new SensorSource)
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner)
val tenSecsMaxTemps: DataStream[(String, Double)] = sensorData
// project to sensor id and temperature
.map(r => (r.id, r.temperature))
// compute every 10 seconds the max temperature per sensor
.keyBy(_._1)
.timeWindow(Time.seconds(10))
.max(1)
// store latest value for each sensor in a queryable state
tenSecsMaxTemps
.keyBy(_._1)
.asQueryableState("maxTemperature")
// execute application
env.execute("Track max temperature")
}
}
object TemperatureDashboard {
// queryable state proxy connection information.
// can be looked up in logs of running QueryableStateJob
val proxyHost = "localhost"
val proxyPort = 9069
// jobId of running QueryableStateJob.
// can be looked up in logs of running job or the web UI
val jobId = "9a528bf3e1b650aed7e0b1e26d038ad5"
// how many sensors to query
val numSensors = 5
// how often to query
val refreshInterval = 10000
def main(args: Array[String]): Unit = {
// configure client with host and port of queryable state proxy
val client = new QueryableStateClient(proxyHost, proxyPort)
val futures = new Array[CompletableFuture[ValueState[(String,
Double)]]](numSensors)
val results = new Array[Double](numSensors)
// print header line of dashboard table
val header = (for (i <- 0 until numSensors) yield "sensor_" + (i +
1)).mkString("\t| ")
println(header)
// loop forever
while (true) {
// send out async queries
for (i <- 0 until numSensors) {
futures(i) = queryState("sensor_" + (i + 1), client)
}
// wait for results
for (i <- 0 until numSensors) {
results(i) = futures(i).get().value()._2
}
// print result
val line = results.map(t => f"$t%1.3f").mkString("\t| ")
println(line)
// wait to send out next queries
Thread.sleep(refreshInterval)
}
client.shutdownAndWait()
}
def queryState(key: String, client: QueryableStateClient):
CompletableFuture[ValueState[(String, Double)]] = {
client.getKvState[String, ValueState[(String, Double)], (String, Double)](
JobID.fromHexString(jobId),
"maxTemperature",
key,
Types.STRING,
new ValueStateDescriptor[(String, Double)]("", Types.TUPLE[(String,
Double)]))
}
}