I tried to create a sample project, but couldn't reproduce the error! It was working fine. Turns out I was using wrong Tuple2 package in my client code :( After fixing it, the code worked fine.
Thanks Till and Jiayi for your help! Jayant Ameta On Tue, Nov 13, 2018 at 4:01 PM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Jayant, > > could you maybe setup a small Github project with the client and server > code? Otherwise it is really hard to reproduce the problem. Thanks a lot! > > Cheers, > Till > > On Tue, Nov 13, 2018 at 11:29 AM Jayant Ameta <wittyam...@gmail.com> > wrote: > >> Getting the same error even when I added flink-avro dependency to the >> client. >> >> Jayant Ameta >> >> >> On Tue, Nov 13, 2018 at 2:28 PM bupt_ljy <bupt_...@163.com> wrote: >> >>> Hi Jayant, >>> >>> I don’t know why flink uses the Avro serializer, which is usually >>> used in POJO class, but from the error messages, I think you can add >>> flink-avro as a dependency and try again. >>> >>> >>> Best, >>> >>> Jiayi Liao >>> >>> Original Message >>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>> *Recipient:* bupt_ljy<bupt_...@163.com> >>> *Cc:* trohrmann<trohrm...@apache.org>; Tzu-Li (Gordon) Tai< >>> tzuli...@apache.org>; user<user@flink.apache.org> >>> *Date:* Tuesday, Nov 13, 2018 16:15 >>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception >>> >>> Thanks Jiayi, >>> I updated the client code to use keyed stream key. The key is a >>> Tuple2<UUID, String> >>> >>> CompletableFuture<MapState<UUID, Rule>> resultFuture = >>> >>> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), >>> "rules", >>> Tuple2.of(uuid, "test"), TypeInformation.of(new >>> TypeHint<Tuple2<UUID, String>>() { >>> }), descriptor); >>> >>> I'm now getting a different exception. I'm NOT using Avro as a customer >>> serializer. Not sure what causes this issue. >>> >>> Caused by: java.lang.RuntimeException: Error while processing request with >>> ID 21. Caused by: java.lang.UnsupportedOperationException: Could not find >>> required Avro dependency. >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170) >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>> at >>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94) >>> at >>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93) >>> at >>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87) >>> at >>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) >>> at >>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> at >>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98) >>> at >>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) >>> at >>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> at >>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) >>> at >>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >>> at >>> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) >>> at >>> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) >>> at >>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> at >>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) >>> at >>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >>> at >>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >>> at >>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>> at >>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>> at >>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) >>> at >>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >>> at >>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >>> at >>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> ... 1 more >>> >>> Jayant Ameta >>> >>> >>> On Tue, Nov 13, 2018 at 11:35 AM bupt_ljy <bupt_...@163.com> wrote: >>> >>>> Hi, Jayant >>>> >>>> The key you specified in getKvState function should be the key of >>>> the keyed stream instead of the key of the map. From what I’ve seen on >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html, >>>> this feature only supports managed keyed state. >>>> >>>> By the way, I think we should optimize the error messages with which >>>> what Jayant met. >>>> >>>> Best, >>>> Jiayi Liao >>>> >>>> Original Message >>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>> *Recipient:* trohrmann<trohrm...@apache.org> >>>> *Cc:* bupt_ljy<bupt_...@163.com>; Tzu-Li (Gordon) Tai< >>>> tzuli...@apache.org>; user<user@flink.apache.org> >>>> *Date:* Tuesday, Nov 13, 2018 13:39 >>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>> Exception >>>> >>>> Hi Till, >>>> Here is the client snippet. Here Rule is a custom POJO that I use. >>>> >>>> public static void main(String[] args) >>>> throws IOException, InterruptedException, ExecutionException { >>>> UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb"); >>>> >>>> QueryableStateClient client = new QueryableStateClient("127.0.1.1", >>>> 9069); >>>> ExecutionConfig config = new ExecutionConfig(); >>>> client.setExecutionConfig(config); >>>> >>>> MapStateDescriptor<UUID, Rule> descriptor = new >>>> MapStateDescriptor<>("rulePatterns", UUID.class, >>>> Rule.class); >>>> CompletableFuture<MapState<UUID, Rule>> resultFuture = >>>> >>>> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), >>>> "rules", >>>> uuid, TypeInformation.of(UUID.class), descriptor); >>>> >>>> while (!resultFuture.isDone()) { >>>> Thread.sleep(1000); >>>> } >>>> resultFuture.whenComplete((result, throwable) -> { >>>> if (throwable != null) { >>>> throwable.printStackTrace(); >>>> } else { >>>> try { >>>> System.out.println(result.get(uuid)); >>>> } catch (Exception e) { >>>> e.printStackTrace(); >>>> } >>>> } >>>> }); >>>> } >>>> >>>> >>>> Below is the stack trace: >>>> >>>> Caused by: java.lang.RuntimeException: Error while processing request >>>> with ID 12. Caused by: java.io.IOException: Unable to deserialize key and >>>> namespace. This indicates a mismatch in the key/namespace serializers used >>>> by the KvState instance and this access. >>>> at >>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107) >>>> at >>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93) >>>> at >>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87) >>>> at >>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) >>>> at >>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.io.EOFException >>>> at >>>> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307) >>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770) >>>> at >>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >>>> at >>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>> at >>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94) >>>> ... 9 more >>>> >>>> at >>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98) >>>> at >>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) >>>> at >>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> at >>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) >>>> at >>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >>>> at >>>> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) >>>> at >>>> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) >>>> at >>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> at >>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) >>>> at >>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) >>>> at >>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) >>>> at >>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) >>>> at >>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>> at >>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>>> at >>>> org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324) >>>> at >>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >>>> at >>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >>>> at >>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>> at >>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>>> at >>>> org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563) >>>> at >>>> org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >>>> at >>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) >>>> at java.lang.Thread.run(Thread.java:748) >>>> >>>> Jayant Ameta >>>> >>>> >>>> On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Could you send us a small example program which we can use to >>>>> reproduce the problem? >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <wittyam...@gmail.com> >>>>> wrote: >>>>> >>>>>> Yeah, it IS using Kryo serializer. >>>>>> >>>>>> Jayant Ameta >>>>>> >>>>>> >>>>>> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <trohrm...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Jayant, could you check that the UUID key on the TM is actually >>>>>>> serialized using a Kryo serializer? You can do this by setting a >>>>>>> breakpoint >>>>>>> in the constructor of the `AbstractKeyedStateBackend`. >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <bupt_...@163.com> wrote: >>>>>>> >>>>>>>> Hi, Jayant >>>>>>>> >>>>>>>> Your code looks good to me. And I’ve tried the >>>>>>>> serialize/deserialize of Kryo on UUID class, it all looks okay. >>>>>>>> >>>>>>>> I’m not very sure about this problem. Maybe you can write a >>>>>>>> very simple demo to try if it works. >>>>>>>> >>>>>>>> >>>>>>>> Jiayi Liao, Best >>>>>>>> >>>>>>>> Original Message >>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>>>> *Recipient:* bupt_ljy<bupt_...@163.com> >>>>>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user< >>>>>>>> user@flink.apache.org> >>>>>>>> *Date:* Monday, Oct 29, 2018 11:53 >>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>>>> Exception >>>>>>>> >>>>>>>> Hi Jiayi, >>>>>>>> Any further help on this? >>>>>>>> >>>>>>>> Jayant Ameta >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <wittyam...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> MapStateDescriptor<UUID, String> descriptor = new >>>>>>>>> MapStateDescriptor<>("rulePatterns", UUID.class, >>>>>>>>> String.class); >>>>>>>>> >>>>>>>>> Jayant Ameta >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <bupt_...@163.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Can you show us the descriptor in the codes below? >>>>>>>>>> >>>>>>>>>> client.getKvState(JobID.fromHexString( >>>>>>>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule", >>>>>>>>>> >>>>>>>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"), >>>>>>>>>>> TypeInformation.of(new TypeHint<UUID>() {}), descriptor); >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Jiayi Liao, Best >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Original Message >>>>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>>>>>> *Recipient:* bupt_ljy<bupt_...@163.com> >>>>>>>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user< >>>>>>>>>> user@flink.apache.org> >>>>>>>>>> *Date:* Friday, Oct 26, 2018 02:26 >>>>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>>>>>> Exception >>>>>>>>>> >>>>>>>>>> Also, I haven't provided any custom serializer in my flink job. >>>>>>>>>> Shouldn't the same configuration work for queryable state client? >>>>>>>>>> >>>>>>>>>> Jayant Ameta >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta < >>>>>>>>>> wittyam...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Gordon, >>>>>>>>>>> Following is the stack trace that I'm getting: >>>>>>>>>>> >>>>>>>>>>> *Exception in thread "main" >>>>>>>>>>> java.util.concurrent.ExecutionException: >>>>>>>>>>> java.lang.RuntimeException: Failed >>>>>>>>>>> request 0.* >>>>>>>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.* >>>>>>>>>>> * Caused by: java.lang.RuntimeException: Error while processing >>>>>>>>>>> request with ID 0. Caused by: >>>>>>>>>>> com.esotericsoftware.kryo.KryoException: >>>>>>>>>>> Encountered unregistered class ID: -985346241* >>>>>>>>>>> *Serialization trace:* >>>>>>>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)* >>>>>>>>>>> * at >>>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)* >>>>>>>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)* >>>>>>>>>>> * at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)* >>>>>>>>>>> * at >>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)* >>>>>>>>>>> * at >>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)* >>>>>>>>>>> * at >>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)* >>>>>>>>>>> * at >>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)* >>>>>>>>>>> * at >>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)* >>>>>>>>>>> * at >>>>>>>>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)* >>>>>>>>>>> * at >>>>>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)* >>>>>>>>>>> * at >>>>>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)* >>>>>>>>>>> * at >>>>>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)* >>>>>>>>>>> * at >>>>>>>>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)* >>>>>>>>>>> * at >>>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)* >>>>>>>>>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)* >>>>>>>>>>> * at >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)* >>>>>>>>>>> * at >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)* >>>>>>>>>>> * at java.lang.Thread.run(Thread.java:748)* >>>>>>>>>>> >>>>>>>>>>> I am not using any custom serialize as mentioned by Jiayi. >>>>>>>>>>> >>>>>>>>>>> Jayant Ameta >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bupt_...@163.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Jayant, >>>>>>>>>>>> >>>>>>>>>>>> There should be a Serializer parameter in the constructor of >>>>>>>>>>>> the StateDescriptor, you should create a new serializer like this: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> new GenericTypeInfo(classOf[UUID]).createSerializer(env >>>>>>>>>>>> .getConfig) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> By the way, can you show us your kryo exception like what >>>>>>>>>>>> Gordon said? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Jiayi Liao, Best >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Original Message >>>>>>>>>>>> *Sender:* Tzu-Li (Gordon) Tai<tzuli...@apache.org> >>>>>>>>>>>> *Recipient:* Jayant Ameta<wittyam...@gmail.com>; bupt_ljy< >>>>>>>>>>>> bupt_...@163.com> >>>>>>>>>>>> *Cc:* user<user@flink.apache.org> >>>>>>>>>>>> *Date:* Thursday, Oct 25, 2018 17:18 >>>>>>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>>>>>>>> Exception >>>>>>>>>>>> >>>>>>>>>>>> Hi Jayant, >>>>>>>>>>>> >>>>>>>>>>>> What is the Kryo exception message that you are getting? >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Gordon >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta ( >>>>>>>>>>>> wittyam...@gmail.com) wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> I've not configured any serializer in the descriptor. (Neither >>>>>>>>>>>> in flink job, nor in state query client). >>>>>>>>>>>> Which serializer should I use? >>>>>>>>>>>> >>>>>>>>>>>> Jayant Ameta >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bupt_...@163.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> It seems that your codes are right. Are you sure that >>>>>>>>>>>>> you’re using the same Serializer as the Flink program do? Could >>>>>>>>>>>>> you show >>>>>>>>>>>>> the serializer in descriptor? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Jiayi Liao, Best >>>>>>>>>>>>> >>>>>>>>>>>>> Original Message >>>>>>>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>>>>>>>>> *Recipient:* user<user@flink.apache.org> >>>>>>>>>>>>> *Date:* Thursday, Oct 25, 2018 14:17 >>>>>>>>>>>>> *Subject:* Queryable state when key is UUID - getting Kyro >>>>>>>>>>>>> Exception >>>>>>>>>>>>> >>>>>>>>>>>>> I get Kyro exception when querying the state. >>>>>>>>>>>>> >>>>>>>>>>>>> Key: UUID >>>>>>>>>>>>> MapState<UUID, String> >>>>>>>>>>>>> >>>>>>>>>>>>> Client code snippet: >>>>>>>>>>>>> >>>>>>>>>>>>> CompletableFuture<MapState<UUID, String>> resultFuture = >>>>>>>>>>>>> >>>>>>>>>>>>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), >>>>>>>>>>>>> "rule", >>>>>>>>>>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"), >>>>>>>>>>>>> TypeInformation.of(new TypeHint<UUID>() {}), descriptor); >>>>>>>>>>>>> MapState<UUID, String> mapState = resultFuture.get(10, >>>>>>>>>>>>> TimeUnit.SECONDS); >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Any better way to query it? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Jayant Ameta >>>>>>>>>>>>> >>>>>>>>>>>>