Hi I was experimenting with the Query State feature and I have some
problems querying the state.
The code which I use to produce the queryable state is:
env.addSource(kafkaConsumer).map(
e => e match {
case LoginClickEvent(_, t) => ("login", 1, t)
case LogoutClickEvent(_, t) => ("logout", 1, t)
case ButtonClickEvent(_, _, t) => ("button", 1, t)
}).keyBy(0).timeWindow(Time.seconds(1))
.reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3)))
.map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
.keyBy("key")
.asQueryableState(
"type-time-series-count",
new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
"type-time-series-count",
classOf[KeyedDataPoint[java.lang.Integer]]))
As you see it is a rather simple job, in which I try to count events of
different types in windows and then query by event type.
In client code I do:
// Query Flink state
val future = client.getKvState(jobId, "type-time-series-count",
key.hashCode, seralizedKey)
// Await async result
val serializedResult: Array[Byte] = Await.result(
future, new FiniteDuration(
10,
duration.SECONDS))
// Deserialize response
val results = deserializeResponse(serializedResult)
results
}
private def deserializeResponse(serializedResult: Array[Byte]):
util.List[KeyedDataPoint[lang
.Integer]] = {
KvStateRequestSerializer.deserializeList(serializedResult,
getValueSerializer())
}
As I was trying to debug the issue I see the first element in list gets
deserialized correctly, but it fails on the second one. It seems like the
serialized result is broken. Do you have any idea if I am doing sth wrong
or there is some bug?
The exception I get is:
java.io.EOFException: null
at
org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:157)
at
org.apache.flink.runtime.util.DataInputDeserializer.readUTF(DataInputDeserializer.java:240)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:386)
at
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeList(KvStateRequestSerializer.java:487)
at
com.dataartisans.stateserver.queryclient.QueryClient.deserializeResponse(QueryClient.scala:44)
You can browse the exact code at: https://github.com/dawidwys/flink-intro
I would be grateful for any advice.
Regards
Dawid Wysakowicz