Hi Nico, Recently I've tried the queryable state a bit differently, by using ValueState with a value of a util.ArrayList and a ValueSerializer for util.ArrayList and it works as expected.
The non-working example you can browse here: https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a70a6fe2219 The working example here: https://github.com/dawidwys/flink-intro/tree/master (The QueryableJob is in module flink-queryable-job and the QueryClient in flink-state-server) Sure, I am aware of the downfall of the ListState. I need it just for presentational purpose, but you may be right there might not be any production use for this state and it should be removed. Maybe the problem is just with the ListState and removing it would resolve also my problem :) Regards Dawid Wysakowicz 2017-01-13 18:50 GMT+01:00 Nico Kruber <n...@data-artisans.com>: > Hi Dawid, > I'll try to reproduce the error in the next couple of days. Can you also > share > the value deserializer you use? Also, have you tried even smaller examples > in > the meantime? Did they work? > > As a side-note in general regarding the queryable state "sink" using > ListState > (".asQueryableState(<name>, ListStateDescriptor)"): everything that enters > this operator will be stored forever and never cleaned. Eventually, it will > pile up too much memory and is thus of limited use. Maybe it should even be > removed from the API. > > > Nico > > On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote: > > Hey Ufuk. > > Did you maybe had a while to have a look at that problem? > > > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <u...@apache.org>: > > > Hey Dawid! Thanks for reporting this. I will try to have a look over > > > the course of the day. From a first impression, this seems like a bug > > > to me. > > > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz > > > > > > <wysakowicz.da...@gmail.com> wrote: > > > > Hi I was experimenting with the Query State feature and I have some > > > > > > problems > > > > > > > querying the state. > > > > > > > > The code which I use to produce the queryable state is: > > > > env.addSource(kafkaConsumer).map( > > > > > > > > e => e match { > > > > > > > > case LoginClickEvent(_, t) => ("login", 1, t) > > > > case LogoutClickEvent(_, t) => ("logout", 1, t) > > > > case ButtonClickEvent(_, _, t) => ("button", 1, t) > > > > > > > > }).keyBy(0).timeWindow(Time.seconds(1)) > > > > .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, > > > > e2._3))) > > > > .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, > e._2)) > > > > .keyBy("key") > > > > .asQueryableState( > > > > > > > > "type-time-series-count", > > > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( > > > > > > > > "type-time-series-count", > > > > classOf[KeyedDataPoint[java.lang.Integer]])) > > > > > > > > As you see it is a rather simple job, in which I try to count events > of > > > > different types in windows and then query by event type. > > > > > > > > In client code I do: > > > > // Query Flink state > > > > val future = client.getKvState(jobId, "type-time-series-count", > > > > > > > > key.hashCode, seralizedKey) > > > > > > > > // Await async result > > > > val serializedResult: Array[Byte] = Await.result( > > > > > > > > future, new FiniteDuration( > > > > > > > > 10, > > > > duration.SECONDS)) > > > > > > > > // Deserialize response > > > > val results = deserializeResponse(serializedResult) > > > > > > > > results > > > > > > > > } > > > > > > > > private def deserializeResponse(serializedResult: Array[Byte]): > > > > util.List[KeyedDataPoint[lang > > > > > > > > .Integer]] = { > > > > > > > > KvStateRequestSerializer.deserializeList(serializedResult, > > > > > > > > getValueSerializer()) > > > > > > > > } > > > > > > > > As I was trying to debug the issue I see the first element in list > gets > > > > deserialized correctly, but it fails on the second one. It seems like > > > > the > > > > serialized result is broken. Do you have any idea if I am doing sth > > > > > > wrong or > > > > > > > there is some bug? > > > > > > > > > > > > The exception I get is: > > > > java.io.EOFException: null > > > > at > > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully( > > > > > > DataInputDeserializer.java:157) > > > > > > > at > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF( > > > > > > DataInputDeserializer.java:240) > > > > > > > at > > > > org.apache.flink.api.java.typeutils.runtime. > PojoSerializer.deserialize( > > > > > > PojoSerializer.java:386) > > > > > > > at > > > > org.apache.flink.runtime.query.netty.message. > KvStateRequestSerializer. > > > > > > deserializeList(KvStateRequestSerializer.java:487) > > > > > > > at > > > > com.dataartisans.stateserver.queryclient.QueryClient. > > > > > > deserializeResponse(QueryClient.scala:44) > > > > > > > You can browse the exact code at: https://github.com/dawidwys/ > > > > > > flink-intro > > > > > > > I would be grateful for any advice. > > > > > > > > Regards > > > > Dawid Wysakowicz >