Hi Jayant, could you maybe setup a small Github project with the client and server code? Otherwise it is really hard to reproduce the problem. Thanks a lot!
Cheers, Till On Tue, Nov 13, 2018 at 11:29 AM Jayant Ameta <wittyam...@gmail.com> wrote: > Getting the same error even when I added flink-avro dependency to the > client. > > Jayant Ameta > > > On Tue, Nov 13, 2018 at 2:28 PM bupt_ljy <bupt_...@163.com> wrote: > >> Hi Jayant, >> >> I don’t know why flink uses the Avro serializer, which is usually used >> in POJO class, but from the error messages, I think you can add flink-avro >> as a dependency and try again. >> >> >> Best, >> >> Jiayi Liao >> >> Original Message >> *Sender:* Jayant Ameta<wittyam...@gmail.com> >> *Recipient:* bupt_ljy<bupt_...@163.com> >> *Cc:* trohrmann<trohrm...@apache.org>; Tzu-Li (Gordon) Tai< >> tzuli...@apache.org>; user<user@flink.apache.org> >> *Date:* Tuesday, Nov 13, 2018 16:15 >> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception >> >> Thanks Jiayi, >> I updated the client code to use keyed stream key. The key is a >> Tuple2<UUID, String> >> >> CompletableFuture<MapState<UUID, Rule>> resultFuture = >> >> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), >> "rules", >> Tuple2.of(uuid, "test"), TypeInformation.of(new >> TypeHint<Tuple2<UUID, String>>() { >> }), descriptor); >> >> I'm now getting a different exception. I'm NOT using Avro as a customer >> serializer. Not sure what causes this issue. >> >> Caused by: java.lang.RuntimeException: Error while processing request with >> ID 21. Caused by: java.lang.UnsupportedOperationException: Could not find >> required Avro dependency. >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94) >> at >> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93) >> at >> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87) >> at >> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) >> at >> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:748) >> >> at >> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98) >> at >> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) >> at >> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:748) >> >> at >> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) >> at >> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >> at >> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) >> at >> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) >> at >> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:748) >> >> at >> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) >> at >> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >> at >> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >> at >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >> at >> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >> at >> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) >> at >> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >> at >> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >> at >> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> ... 1 more >> >> Jayant Ameta >> >> >> On Tue, Nov 13, 2018 at 11:35 AM bupt_ljy <bupt_...@163.com> wrote: >> >>> Hi, Jayant >>> >>> The key you specified in getKvState function should be the key of the >>> keyed stream instead of the key of the map. From what I’ve seen on >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html, >>> this feature only supports managed keyed state. >>> >>> By the way, I think we should optimize the error messages with which >>> what Jayant met. >>> >>> Best, >>> Jiayi Liao >>> >>> Original Message >>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>> *Recipient:* trohrmann<trohrm...@apache.org> >>> *Cc:* bupt_ljy<bupt_...@163.com>; Tzu-Li (Gordon) Tai< >>> tzuli...@apache.org>; user<user@flink.apache.org> >>> *Date:* Tuesday, Nov 13, 2018 13:39 >>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception >>> >>> Hi Till, >>> Here is the client snippet. Here Rule is a custom POJO that I use. >>> >>> public static void main(String[] args) >>> throws IOException, InterruptedException, ExecutionException { >>> UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb"); >>> >>> QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069); >>> ExecutionConfig config = new ExecutionConfig(); >>> client.setExecutionConfig(config); >>> >>> MapStateDescriptor<UUID, Rule> descriptor = new >>> MapStateDescriptor<>("rulePatterns", UUID.class, >>> Rule.class); >>> CompletableFuture<MapState<UUID, Rule>> resultFuture = >>> >>> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), >>> "rules", >>> uuid, TypeInformation.of(UUID.class), descriptor); >>> >>> while (!resultFuture.isDone()) { >>> Thread.sleep(1000); >>> } >>> resultFuture.whenComplete((result, throwable) -> { >>> if (throwable != null) { >>> throwable.printStackTrace(); >>> } else { >>> try { >>> System.out.println(result.get(uuid)); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> } >>> }); >>> } >>> >>> >>> Below is the stack trace: >>> >>> Caused by: java.lang.RuntimeException: Error while processing request >>> with ID 12. Caused by: java.io.IOException: Unable to deserialize key and >>> namespace. This indicates a mismatch in the key/namespace serializers used >>> by the KvState instance and this access. >>> at >>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107) >>> at >>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93) >>> at >>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87) >>> at >>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) >>> at >>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.io.EOFException >>> at >>> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307) >>> at org.apache.flink.types.StringValue.readString(StringValue.java:770) >>> at >>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >>> at >>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) >>> at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>> at >>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94) >>> ... 9 more >>> >>> at >>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98) >>> at >>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) >>> at >>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> at >>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) >>> at >>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >>> at >>> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) >>> at >>> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) >>> at >>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> at >>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) >>> at >>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) >>> at >>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) >>> at >>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) >>> at >>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>> at >>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>> at >>> org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324) >>> at >>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >>> at >>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >>> at >>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>> at >>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >>> at >>> org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563) >>> at >>> org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >>> at >>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >>> at >>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >>> at >>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> Jayant Ameta >>> >>> >>> On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Could you send us a small example program which we can use to reproduce >>>> the problem? >>>> >>>> Cheers, >>>> Till >>>> >>>> On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <wittyam...@gmail.com> >>>> wrote: >>>> >>>>> Yeah, it IS using Kryo serializer. >>>>> >>>>> Jayant Ameta >>>>> >>>>> >>>>> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <trohrm...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Jayant, could you check that the UUID key on the TM is actually >>>>>> serialized using a Kryo serializer? You can do this by setting a >>>>>> breakpoint >>>>>> in the constructor of the `AbstractKeyedStateBackend`. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <bupt_...@163.com> wrote: >>>>>> >>>>>>> Hi, Jayant >>>>>>> >>>>>>> Your code looks good to me. And I’ve tried the >>>>>>> serialize/deserialize of Kryo on UUID class, it all looks okay. >>>>>>> >>>>>>> I’m not very sure about this problem. Maybe you can write a very >>>>>>> simple demo to try if it works. >>>>>>> >>>>>>> >>>>>>> Jiayi Liao, Best >>>>>>> >>>>>>> Original Message >>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>>> *Recipient:* bupt_ljy<bupt_...@163.com> >>>>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user< >>>>>>> user@flink.apache.org> >>>>>>> *Date:* Monday, Oct 29, 2018 11:53 >>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>>> Exception >>>>>>> >>>>>>> Hi Jiayi, >>>>>>> Any further help on this? >>>>>>> >>>>>>> Jayant Ameta >>>>>>> >>>>>>> >>>>>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <wittyam...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> MapStateDescriptor<UUID, String> descriptor = new >>>>>>>> MapStateDescriptor<>("rulePatterns", UUID.class, >>>>>>>> String.class); >>>>>>>> >>>>>>>> Jayant Ameta >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <bupt_...@163.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Can you show us the descriptor in the codes below? >>>>>>>>> >>>>>>>>> client.getKvState(JobID.fromHexString( >>>>>>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule", >>>>>>>>> >>>>>>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"), >>>>>>>>>> TypeInformation.of(new TypeHint<UUID>() {}), descriptor); >>>>>>>>>> >>>>>>>>>> >>>>>>>>> Jiayi Liao, Best >>>>>>>>> >>>>>>>>> >>>>>>>>> Original Message >>>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>>>>> *Recipient:* bupt_ljy<bupt_...@163.com> >>>>>>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user< >>>>>>>>> user@flink.apache.org> >>>>>>>>> *Date:* Friday, Oct 26, 2018 02:26 >>>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>>>>> Exception >>>>>>>>> >>>>>>>>> Also, I haven't provided any custom serializer in my flink job. >>>>>>>>> Shouldn't the same configuration work for queryable state client? >>>>>>>>> >>>>>>>>> Jayant Ameta >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <wittyam...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Gordon, >>>>>>>>>> Following is the stack trace that I'm getting: >>>>>>>>>> >>>>>>>>>> *Exception in thread "main" >>>>>>>>>> java.util.concurrent.ExecutionException: java.lang.RuntimeException: >>>>>>>>>> Failed >>>>>>>>>> request 0.* >>>>>>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.* >>>>>>>>>> * Caused by: java.lang.RuntimeException: Error while processing >>>>>>>>>> request with ID 0. Caused by: >>>>>>>>>> com.esotericsoftware.kryo.KryoException: >>>>>>>>>> Encountered unregistered class ID: -985346241* >>>>>>>>>> *Serialization trace:* >>>>>>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)* >>>>>>>>>> * at >>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)* >>>>>>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)* >>>>>>>>>> * at >>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)* >>>>>>>>>> * at >>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)* >>>>>>>>>> * at >>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)* >>>>>>>>>> * at >>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)* >>>>>>>>>> * at >>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)* >>>>>>>>>> * at >>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)* >>>>>>>>>> * at >>>>>>>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)* >>>>>>>>>> * at >>>>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)* >>>>>>>>>> * at >>>>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)* >>>>>>>>>> * at >>>>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)* >>>>>>>>>> * at >>>>>>>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)* >>>>>>>>>> * at >>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)* >>>>>>>>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)* >>>>>>>>>> * at >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)* >>>>>>>>>> * at >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)* >>>>>>>>>> * at java.lang.Thread.run(Thread.java:748)* >>>>>>>>>> >>>>>>>>>> I am not using any custom serialize as mentioned by Jiayi. >>>>>>>>>> >>>>>>>>>> Jayant Ameta >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bupt_...@163.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Jayant, >>>>>>>>>>> >>>>>>>>>>> There should be a Serializer parameter in the constructor of >>>>>>>>>>> the StateDescriptor, you should create a new serializer like this: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> new GenericTypeInfo(classOf[UUID]).createSerializer(env >>>>>>>>>>> .getConfig) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> By the way, can you show us your kryo exception like what >>>>>>>>>>> Gordon said? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Jiayi Liao, Best >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Original Message >>>>>>>>>>> *Sender:* Tzu-Li (Gordon) Tai<tzuli...@apache.org> >>>>>>>>>>> *Recipient:* Jayant Ameta<wittyam...@gmail.com>; bupt_ljy< >>>>>>>>>>> bupt_...@163.com> >>>>>>>>>>> *Cc:* user<user@flink.apache.org> >>>>>>>>>>> *Date:* Thursday, Oct 25, 2018 17:18 >>>>>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>>>>>>> Exception >>>>>>>>>>> >>>>>>>>>>> Hi Jayant, >>>>>>>>>>> >>>>>>>>>>> What is the Kryo exception message that you are getting? >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Gordon >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta ( >>>>>>>>>>> wittyam...@gmail.com) wrote: >>>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> I've not configured any serializer in the descriptor. (Neither >>>>>>>>>>> in flink job, nor in state query client). >>>>>>>>>>> Which serializer should I use? >>>>>>>>>>> >>>>>>>>>>> Jayant Ameta >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bupt_...@163.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> It seems that your codes are right. Are you sure that you’re >>>>>>>>>>>> using the same Serializer as the Flink program do? Could you show >>>>>>>>>>>> the >>>>>>>>>>>> serializer in descriptor? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Jiayi Liao, Best >>>>>>>>>>>> >>>>>>>>>>>> Original Message >>>>>>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>>>>>>>> *Recipient:* user<user@flink.apache.org> >>>>>>>>>>>> *Date:* Thursday, Oct 25, 2018 14:17 >>>>>>>>>>>> *Subject:* Queryable state when key is UUID - getting Kyro >>>>>>>>>>>> Exception >>>>>>>>>>>> >>>>>>>>>>>> I get Kyro exception when querying the state. >>>>>>>>>>>> >>>>>>>>>>>> Key: UUID >>>>>>>>>>>> MapState<UUID, String> >>>>>>>>>>>> >>>>>>>>>>>> Client code snippet: >>>>>>>>>>>> >>>>>>>>>>>> CompletableFuture<MapState<UUID, String>> resultFuture = >>>>>>>>>>>> >>>>>>>>>>>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), >>>>>>>>>>>> "rule", >>>>>>>>>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"), >>>>>>>>>>>> TypeInformation.of(new TypeHint<UUID>() {}), descriptor); >>>>>>>>>>>> MapState<UUID, String> mapState = resultFuture.get(10, >>>>>>>>>>>> TimeUnit.SECONDS); >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Any better way to query it? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Jayant Ameta >>>>>>>>>>>> >>>>>>>>>>>