Hi Nico,

Recently I've tried the queryable state a bit differently, by using
ValueState with a value of a util.ArrayList and a ValueSerializer for
util.ArrayList and it works as expected.

The non-working example you can browse here:
https://github.com/dawidwys/flink-intro/tree/c66f01117b0fe3c0adc8923000543a70a6fe2219
The working example here:
https://github.com/dawidwys/flink-intro/tree/master
(The QueryableJob is in module flink-queryable-job and the QueryClient in
flink-state-server)

Sure, I am aware of the downfall of the ListState. I need it just for
presentational purpose, but you may be right there might not be any
production use for this state and it should be removed.
Maybe the problem is just with the ListState and removing it would resolve
also my problem :)

Regards
Dawid Wysakowicz


2017-01-13 18:50 GMT+01:00 Nico Kruber <n...@data-artisans.com>:

> Hi Dawid,
> I'll try to reproduce the error in the next couple of days. Can you also
> share
> the value deserializer you use? Also, have you tried even smaller examples
> in
> the meantime? Did they work?
>
> As a side-note in general regarding the queryable state "sink" using
> ListState
> (".asQueryableState(<name>, ListStateDescriptor)"): everything that enters
> this operator will be stored forever and never cleaned. Eventually, it will
> pile up too much memory and is thus of limited use. Maybe it should even be
> removed from the API.
>
>
> Nico
>
> On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> > Hey Ufuk.
> > Did you maybe had a while to have a look at that problem?
> >
> > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> > > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > > the course of the day. From a first impression, this seems like a bug
> > > to me.
> > >
> > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > >
> > > <wysakowicz.da...@gmail.com> wrote:
> > > > Hi I was experimenting with the Query State feature and I have some
> > >
> > > problems
> > >
> > > > querying the state.
> > > >
> > > > The code which I use to produce the queryable state is:
> > > >     env.addSource(kafkaConsumer).map(
> > > >
> > > >       e => e match {
> > > >
> > > >         case LoginClickEvent(_, t) => ("login", 1, t)
> > > >         case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > > >
> > > >       }).keyBy(0).timeWindow(Time.seconds(1))
> > > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > > >       e2._3)))
> > > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3,
> e._2))
> > > >       .keyBy("key")
> > > >       .asQueryableState(
> > > >
> > > >         "type-time-series-count",
> > > >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > > >
> > > >           "type-time-series-count",
> > > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > > >
> > > > As you see it is a rather simple job, in which I try to count events
> of
> > > > different types in windows and then query by event type.
> > > >
> > > > In client code I do:
> > > >     // Query Flink state
> > > >     val future = client.getKvState(jobId, "type-time-series-count",
> > > >
> > > > key.hashCode, seralizedKey)
> > > >
> > > >     // Await async result
> > > >     val serializedResult: Array[Byte] = Await.result(
> > > >
> > > >       future, new FiniteDuration(
> > > >
> > > >         10,
> > > >         duration.SECONDS))
> > > >
> > > >     // Deserialize response
> > > >     val results = deserializeResponse(serializedResult)
> > > >
> > > >     results
> > > >
> > > >   }
> > > >
> > > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > > util.List[KeyedDataPoint[lang
> > > >
> > > >   .Integer]] = {
> > > >
> > > >     KvStateRequestSerializer.deserializeList(serializedResult,
> > > >
> > > > getValueSerializer())
> > > >
> > > >   }
> > > >
> > > > As I was trying to debug the issue I see the first element in list
> gets
> > > > deserialized correctly, but it fails on the second one. It seems like
> > > > the
> > > > serialized result is broken. Do you have any idea if I am doing sth
> > >
> > > wrong or
> > >
> > > > there is some bug?
> > > >
> > > >
> > > > The exception I get is:
> > > > java.io.EOFException: null
> > > > at
> > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > >
> > > DataInputDeserializer.java:157)
> > >
> > > > at
> > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > >
> > > DataInputDeserializer.java:240)
> > >
> > > > at
> > > > org.apache.flink.api.java.typeutils.runtime.
> PojoSerializer.deserialize(
> > >
> > > PojoSerializer.java:386)
> > >
> > > > at
> > > > org.apache.flink.runtime.query.netty.message.
> KvStateRequestSerializer.
> > >
> > > deserializeList(KvStateRequestSerializer.java:487)
> > >
> > > > at
> > > > com.dataartisans.stateserver.queryclient.QueryClient.
> > >
> > > deserializeResponse(QueryClient.scala:44)
> > >
> > > > You can browse the exact code at: https://github.com/dawidwys/
> > >
> > > flink-intro
> > >
> > > > I would be grateful for any advice.
> > > >
> > > > Regards
> > > > Dawid Wysakowicz
>

Reply via email to