I tried to create a sample project, but couldn't reproduce the error! It
was working fine.
Turns out I was using wrong Tuple2 package in my client code :(
After fixing it, the code worked fine.

Thanks Till and Jiayi for your help!

Jayant Ameta


On Tue, Nov 13, 2018 at 4:01 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Jayant,
>
> could you maybe setup a small Github project with the client and server
> code? Otherwise it is really hard to reproduce the problem. Thanks a lot!
>
> Cheers,
> Till
>
> On Tue, Nov 13, 2018 at 11:29 AM Jayant Ameta <wittyam...@gmail.com>
> wrote:
>
>> Getting the same error even when I added flink-avro dependency to the
>> client.
>>
>> Jayant Ameta
>>
>>
>> On Tue, Nov 13, 2018 at 2:28 PM bupt_ljy <bupt_...@163.com> wrote:
>>
>>> Hi Jayant,
>>>
>>>    I don’t know why flink uses the Avro serializer, which is usually
>>> used in POJO class, but from the error messages, I think you can add
>>> flink-avro as a dependency and try again.
>>>
>>>
>>> Best,
>>>
>>> Jiayi Liao
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta<wittyam...@gmail.com>
>>> *Recipient:* bupt_ljy<bupt_...@163.com>
>>> *Cc:* trohrmann<trohrm...@apache.org>; Tzu-Li (Gordon) Tai<
>>> tzuli...@apache.org>; user<user@flink.apache.org>
>>> *Date:* Tuesday, Nov 13, 2018 16:15
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Thanks Jiayi,
>>> I updated the client code to use keyed stream key. The key is a
>>> Tuple2<UUID, String>
>>>
>>> CompletableFuture<MapState<UUID, Rule>> resultFuture =
>>>     
>>> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), 
>>> "rules",
>>>         Tuple2.of(uuid, "test"), TypeInformation.of(new 
>>> TypeHint<Tuple2<UUID, String>>() {
>>>         }), descriptor);
>>>
>>> I'm now getting a different exception. I'm NOT using Avro as a customer 
>>> serializer. Not sure what causes this issue.
>>>
>>> Caused by: java.lang.RuntimeException: Error while processing request with 
>>> ID 21. Caused by: java.lang.UnsupportedOperationException: Could not find 
>>> required Avro dependency.
>>>     at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>     at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>>>     at 
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>>     at 
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>     at 
>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
>>>     at 
>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
>>>     at 
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
>>>     at 
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>>     at 
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>>     at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>>     at 
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
>>>     at 
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>>     at 
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>>     at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>>     at 
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
>>>     at 
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>>     at 
>>> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
>>>     at 
>>> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
>>>     at 
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>>     at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>>     at 
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
>>>     at 
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>>     at 
>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>>     at 
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>     at 
>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>     at 
>>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
>>>     at 
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>>     at 
>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>>     at 
>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>     ... 1 more
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Tue, Nov 13, 2018 at 11:35 AM bupt_ljy <bupt_...@163.com> wrote:
>>>
>>>> Hi, Jayant
>>>>
>>>>    The key you specified in getKvState function should be the key of
>>>> the keyed stream instead of the key of the map. From what I’ve seen on
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html,
>>>> this feature only supports managed keyed state.
>>>>
>>>>    By the way, I think we should optimize the error messages with which
>>>> what Jayant met.
>>>>
>>>> Best,
>>>> Jiayi Liao
>>>>
>>>>  Original Message
>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com>
>>>> *Recipient:* trohrmann<trohrm...@apache.org>
>>>> *Cc:* bupt_ljy<bupt_...@163.com>; Tzu-Li (Gordon) Tai<
>>>> tzuli...@apache.org>; user<user@flink.apache.org>
>>>> *Date:* Tuesday, Nov 13, 2018 13:39
>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>> Exception
>>>>
>>>> Hi Till,
>>>> Here is the client snippet. Here Rule is a custom POJO that I use.
>>>>
>>>> public static void main(String[] args)
>>>>     throws IOException, InterruptedException, ExecutionException {
>>>>   UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb");
>>>>
>>>>   QueryableStateClient client = new QueryableStateClient("127.0.1.1", 
>>>> 9069);
>>>>   ExecutionConfig config = new ExecutionConfig();
>>>>   client.setExecutionConfig(config);
>>>>
>>>>   MapStateDescriptor<UUID, Rule> descriptor = new 
>>>> MapStateDescriptor<>("rulePatterns", UUID.class,
>>>>       Rule.class);
>>>>   CompletableFuture<MapState<UUID, Rule>> resultFuture =
>>>>       
>>>> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), 
>>>> "rules",
>>>>           uuid, TypeInformation.of(UUID.class), descriptor);
>>>>
>>>>   while (!resultFuture.isDone()) {
>>>>     Thread.sleep(1000);
>>>>   }
>>>>   resultFuture.whenComplete((result, throwable) -> {
>>>>     if (throwable != null) {
>>>>       throwable.printStackTrace();
>>>>     } else {
>>>>       try {
>>>>         System.out.println(result.get(uuid));
>>>>       } catch (Exception e) {
>>>>         e.printStackTrace();
>>>>       }
>>>>     }
>>>>   });
>>>> }
>>>>
>>>>
>>>> Below is the stack trace:
>>>>
>>>> Caused by: java.lang.RuntimeException: Error while processing request
>>>> with ID 12. Caused by: java.io.IOException: Unable to deserialize key and
>>>> namespace. This indicates a mismatch in the key/namespace serializers used
>>>> by the KvState instance and this access.
>>>> at
>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107)
>>>> at
>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
>>>> at
>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
>>>> at
>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>>> at
>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>>> at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.io.EOFException
>>>> at
>>>> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307)
>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>>>> at
>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>>>> at
>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>> at
>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
>>>> ... 9 more
>>>>
>>>> at
>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
>>>> at
>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>>> at
>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>>> at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> at
>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
>>>> at
>>>> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
>>>> at
>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>>> at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> at
>>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>>>> at
>>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>>>> at
>>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>>> at
>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>> at
>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>> at
>>>> org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>>> at
>>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>>> at
>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>> at
>>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>>> at
>>>> org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563)
>>>> at
>>>> org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> Jayant Ameta
>>>>
>>>>
>>>> On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Could you send us a small example program which we can use to
>>>>> reproduce the problem?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <wittyam...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yeah, it IS using Kryo serializer.
>>>>>>
>>>>>> Jayant Ameta
>>>>>>
>>>>>>
>>>>>> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <trohrm...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Jayant, could you check that the UUID key on the TM is actually
>>>>>>> serialized using a Kryo serializer? You can do this by setting a 
>>>>>>> breakpoint
>>>>>>> in the constructor of the `AbstractKeyedStateBackend`.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <bupt_...@163.com> wrote:
>>>>>>>
>>>>>>>> Hi, Jayant
>>>>>>>>
>>>>>>>>     Your code looks good to me. And I’ve tried the
>>>>>>>> serialize/deserialize of Kryo on UUID class, it all looks okay.
>>>>>>>>
>>>>>>>>     I’m not very sure about this problem. Maybe you can write a
>>>>>>>> very simple demo to try if it works.
>>>>>>>>
>>>>>>>>
>>>>>>>> Jiayi Liao, Best
>>>>>>>>
>>>>>>>>  Original Message
>>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com>
>>>>>>>> *Recipient:* bupt_ljy<bupt_...@163.com>
>>>>>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user<
>>>>>>>> user@flink.apache.org>
>>>>>>>> *Date:* Monday, Oct 29, 2018 11:53
>>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>>>>> Exception
>>>>>>>>
>>>>>>>> Hi Jiayi,
>>>>>>>> Any further help on this?
>>>>>>>>
>>>>>>>> Jayant Ameta
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <wittyam...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> MapStateDescriptor<UUID, String> descriptor = new 
>>>>>>>>> MapStateDescriptor<>("rulePatterns", UUID.class,
>>>>>>>>>     String.class);
>>>>>>>>>
>>>>>>>>> Jayant Ameta
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <bupt_...@163.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>>    Can you show us the descriptor in the codes below?
>>>>>>>>>>
>>>>>>>>>>     client.getKvState(JobID.fromHexString(
>>>>>>>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>>>>>>>>
>>>>>>>>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>>>>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> Jiayi Liao, Best
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>  Original Message
>>>>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com>
>>>>>>>>>> *Recipient:* bupt_ljy<bupt_...@163.com>
>>>>>>>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user<
>>>>>>>>>> user@flink.apache.org>
>>>>>>>>>> *Date:* Friday, Oct 26, 2018 02:26
>>>>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>>>>>>> Exception
>>>>>>>>>>
>>>>>>>>>> Also, I haven't provided any custom serializer in my flink job.
>>>>>>>>>> Shouldn't the same configuration work for queryable state client?
>>>>>>>>>>
>>>>>>>>>> Jayant Ameta
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <
>>>>>>>>>> wittyam...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Gordon,
>>>>>>>>>>> Following is the stack trace that I'm getting:
>>>>>>>>>>>
>>>>>>>>>>> *Exception in thread "main"
>>>>>>>>>>> java.util.concurrent.ExecutionException: 
>>>>>>>>>>> java.lang.RuntimeException: Failed
>>>>>>>>>>> request 0.*
>>>>>>>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>>>>>>>>>>> * Caused by: java.lang.RuntimeException: Error while processing
>>>>>>>>>>> request with ID 0. Caused by: 
>>>>>>>>>>> com.esotericsoftware.kryo.KryoException:
>>>>>>>>>>> Encountered unregistered class ID: -985346241*
>>>>>>>>>>> *Serialization trace:*
>>>>>>>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>>>>>>>>>>> * at
>>>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>>>>>>>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>>>>>>>>>>> * at
>>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>>>>>>>>>>> * at
>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>>>>>>>>>>> * at
>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>>>>>>>>>>> * at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>>>>>>>>>>> * at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>>>>>>>>>>> * at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>>>>>>>>>>> * at
>>>>>>>>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>>>>>>>>>>> * at
>>>>>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>>>>>>>>>>> * at
>>>>>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
>>>>>>>>>>> * at
>>>>>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
>>>>>>>>>>> * at
>>>>>>>>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
>>>>>>>>>>> * at
>>>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
>>>>>>>>>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>>>>>>>>>>> * at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>>>>>>>>>>> * at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>>>>>>>>>>> * at java.lang.Thread.run(Thread.java:748)*
>>>>>>>>>>>
>>>>>>>>>>> I am not using any custom serialize as mentioned by Jiayi.
>>>>>>>>>>>
>>>>>>>>>>> Jayant Ameta
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bupt_...@163.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi  Jayant,
>>>>>>>>>>>>
>>>>>>>>>>>>   There should be a Serializer parameter in the constructor of
>>>>>>>>>>>> the StateDescriptor, you should create a new serializer like this:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>    new GenericTypeInfo(classOf[UUID]).createSerializer(env
>>>>>>>>>>>> .getConfig)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>  By the way, can you show us your kryo exception like what
>>>>>>>>>>>> Gordon said?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Jiayi Liao, Best
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>  Original Message
>>>>>>>>>>>> *Sender:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>
>>>>>>>>>>>> *Recipient:* Jayant Ameta<wittyam...@gmail.com>; bupt_ljy<
>>>>>>>>>>>> bupt_...@163.com>
>>>>>>>>>>>> *Cc:* user<user@flink.apache.org>
>>>>>>>>>>>> *Date:* Thursday, Oct 25, 2018 17:18
>>>>>>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>>>>>>>>> Exception
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Jayant,
>>>>>>>>>>>>
>>>>>>>>>>>> What is the Kryo exception message that you are getting?
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Gordon
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (
>>>>>>>>>>>> wittyam...@gmail.com) wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> I've not configured any serializer in the descriptor. (Neither
>>>>>>>>>>>> in flink job, nor in state query client).
>>>>>>>>>>>> Which serializer should I use?
>>>>>>>>>>>>
>>>>>>>>>>>> Jayant Ameta
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bupt_...@163.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>>    It seems that your codes are right. Are you sure that
>>>>>>>>>>>>> you’re using the same Serializer as the Flink program do? Could 
>>>>>>>>>>>>> you show
>>>>>>>>>>>>> the serializer in descriptor?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jiayi Liao, Best
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Original Message
>>>>>>>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com>
>>>>>>>>>>>>> *Recipient:* user<user@flink.apache.org>
>>>>>>>>>>>>> *Date:* Thursday, Oct 25, 2018 14:17
>>>>>>>>>>>>> *Subject:* Queryable state when key is UUID - getting Kyro
>>>>>>>>>>>>> Exception
>>>>>>>>>>>>>
>>>>>>>>>>>>> I get Kyro exception when querying the state.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Key: UUID
>>>>>>>>>>>>> MapState<UUID, String>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Client code snippet:
>>>>>>>>>>>>>
>>>>>>>>>>>>> CompletableFuture<MapState<UUID, String>> resultFuture =
>>>>>>>>>>>>>     
>>>>>>>>>>>>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"),
>>>>>>>>>>>>>  "rule",
>>>>>>>>>>>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>>>>>>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>>>>>>>>>>> MapState<UUID, String> mapState = resultFuture.get(10, 
>>>>>>>>>>>>> TimeUnit.SECONDS);
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any better way to query it?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jayant Ameta
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to