Hi Jayant,

could you maybe setup a small Github project with the client and server
code? Otherwise it is really hard to reproduce the problem. Thanks a lot!

Cheers,
Till

On Tue, Nov 13, 2018 at 11:29 AM Jayant Ameta <wittyam...@gmail.com> wrote:

> Getting the same error even when I added flink-avro dependency to the
> client.
>
> Jayant Ameta
>
>
> On Tue, Nov 13, 2018 at 2:28 PM bupt_ljy <bupt_...@163.com> wrote:
>
>> Hi Jayant,
>>
>>    I don’t know why flink uses the Avro serializer, which is usually used
>> in POJO class, but from the error messages, I think you can add flink-avro
>> as a dependency and try again.
>>
>>
>> Best,
>>
>> Jiayi Liao
>>
>>  Original Message
>> *Sender:* Jayant Ameta<wittyam...@gmail.com>
>> *Recipient:* bupt_ljy<bupt_...@163.com>
>> *Cc:* trohrmann<trohrm...@apache.org>; Tzu-Li (Gordon) Tai<
>> tzuli...@apache.org>; user<user@flink.apache.org>
>> *Date:* Tuesday, Nov 13, 2018 16:15
>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>
>> Thanks Jiayi,
>> I updated the client code to use keyed stream key. The key is a
>> Tuple2<UUID, String>
>>
>> CompletableFuture<MapState<UUID, Rule>> resultFuture =
>>     
>> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), 
>> "rules",
>>         Tuple2.of(uuid, "test"), TypeInformation.of(new 
>> TypeHint<Tuple2<UUID, String>>() {
>>         }), descriptor);
>>
>> I'm now getting a different exception. I'm NOT using Avro as a customer 
>> serializer. Not sure what causes this issue.
>>
>> Caused by: java.lang.RuntimeException: Error while processing request with 
>> ID 21. Caused by: java.lang.UnsupportedOperationException: Could not find 
>> required Avro dependency.
>>      at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170)
>>      at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>      at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
>>      at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>      at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>      at 
>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
>>      at 
>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
>>      at 
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
>>      at 
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>      at 
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>      at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>      at java.lang.Thread.run(Thread.java:748)
>>
>>      at 
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
>>      at 
>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>      at 
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>      at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>      at java.lang.Thread.run(Thread.java:748)
>>
>>      at 
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
>>      at 
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>      at 
>> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
>>      at 
>> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
>>      at 
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>      at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>      at java.lang.Thread.run(Thread.java:748)
>>
>>      at 
>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
>>      at 
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>      at 
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>      at 
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>      at 
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>      at 
>> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
>>      at 
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>      at 
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>      at 
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>      ... 1 more
>>
>> Jayant Ameta
>>
>>
>> On Tue, Nov 13, 2018 at 11:35 AM bupt_ljy <bupt_...@163.com> wrote:
>>
>>> Hi, Jayant
>>>
>>>    The key you specified in getKvState function should be the key of the
>>> keyed stream instead of the key of the map. From what I’ve seen on
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html,
>>> this feature only supports managed keyed state.
>>>
>>>    By the way, I think we should optimize the error messages with which
>>> what Jayant met.
>>>
>>> Best,
>>> Jiayi Liao
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta<wittyam...@gmail.com>
>>> *Recipient:* trohrmann<trohrm...@apache.org>
>>> *Cc:* bupt_ljy<bupt_...@163.com>; Tzu-Li (Gordon) Tai<
>>> tzuli...@apache.org>; user<user@flink.apache.org>
>>> *Date:* Tuesday, Nov 13, 2018 13:39
>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>>
>>> Hi Till,
>>> Here is the client snippet. Here Rule is a custom POJO that I use.
>>>
>>> public static void main(String[] args)
>>>     throws IOException, InterruptedException, ExecutionException {
>>>   UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb");
>>>
>>>   QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);
>>>   ExecutionConfig config = new ExecutionConfig();
>>>   client.setExecutionConfig(config);
>>>
>>>   MapStateDescriptor<UUID, Rule> descriptor = new 
>>> MapStateDescriptor<>("rulePatterns", UUID.class,
>>>       Rule.class);
>>>   CompletableFuture<MapState<UUID, Rule>> resultFuture =
>>>       
>>> client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), 
>>> "rules",
>>>           uuid, TypeInformation.of(UUID.class), descriptor);
>>>
>>>   while (!resultFuture.isDone()) {
>>>     Thread.sleep(1000);
>>>   }
>>>   resultFuture.whenComplete((result, throwable) -> {
>>>     if (throwable != null) {
>>>       throwable.printStackTrace();
>>>     } else {
>>>       try {
>>>         System.out.println(result.get(uuid));
>>>       } catch (Exception e) {
>>>         e.printStackTrace();
>>>       }
>>>     }
>>>   });
>>> }
>>>
>>>
>>> Below is the stack trace:
>>>
>>> Caused by: java.lang.RuntimeException: Error while processing request
>>> with ID 12. Caused by: java.io.IOException: Unable to deserialize key and
>>> namespace. This indicates a mismatch in the key/namespace serializers used
>>> by the KvState instance and this access.
>>> at
>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107)
>>> at
>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
>>> at
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
>>> at
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>> at
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.EOFException
>>> at
>>> org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307)
>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>>> at
>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>>> at
>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>> at
>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
>>> ... 9 more
>>>
>>> at
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
>>> at
>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
>>> at
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> at
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
>>> at
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>> at
>>> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
>>> at
>>> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
>>> at
>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> at
>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>>> at
>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>>> at
>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>>> at
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>> at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>> at
>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>> at
>>> org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324)
>>> at
>>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>> at
>>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>> at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>> at
>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>> at
>>> org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563)
>>> at
>>> org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> Jayant Ameta
>>>
>>>
>>> On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Could you send us a small example program which we can use to reproduce
>>>> the problem?
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <wittyam...@gmail.com>
>>>> wrote:
>>>>
>>>>> Yeah, it IS using Kryo serializer.
>>>>>
>>>>> Jayant Ameta
>>>>>
>>>>>
>>>>> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Jayant, could you check that the UUID key on the TM is actually
>>>>>> serialized using a Kryo serializer? You can do this by setting a 
>>>>>> breakpoint
>>>>>> in the constructor of the `AbstractKeyedStateBackend`.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <bupt_...@163.com> wrote:
>>>>>>
>>>>>>> Hi, Jayant
>>>>>>>
>>>>>>>     Your code looks good to me. And I’ve tried the
>>>>>>> serialize/deserialize of Kryo on UUID class, it all looks okay.
>>>>>>>
>>>>>>>     I’m not very sure about this problem. Maybe you can write a very
>>>>>>> simple demo to try if it works.
>>>>>>>
>>>>>>>
>>>>>>> Jiayi Liao, Best
>>>>>>>
>>>>>>>  Original Message
>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com>
>>>>>>> *Recipient:* bupt_ljy<bupt_...@163.com>
>>>>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user<
>>>>>>> user@flink.apache.org>
>>>>>>> *Date:* Monday, Oct 29, 2018 11:53
>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>>>> Exception
>>>>>>>
>>>>>>> Hi Jiayi,
>>>>>>> Any further help on this?
>>>>>>>
>>>>>>> Jayant Ameta
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <wittyam...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> MapStateDescriptor<UUID, String> descriptor = new 
>>>>>>>> MapStateDescriptor<>("rulePatterns", UUID.class,
>>>>>>>>     String.class);
>>>>>>>>
>>>>>>>> Jayant Ameta
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <bupt_...@163.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>>    Can you show us the descriptor in the codes below?
>>>>>>>>>
>>>>>>>>>     client.getKvState(JobID.fromHexString(
>>>>>>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule",
>>>>>>>>>
>>>>>>>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>>>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> Jiayi Liao, Best
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  Original Message
>>>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com>
>>>>>>>>> *Recipient:* bupt_ljy<bupt_...@163.com>
>>>>>>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user<
>>>>>>>>> user@flink.apache.org>
>>>>>>>>> *Date:* Friday, Oct 26, 2018 02:26
>>>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>>>>>> Exception
>>>>>>>>>
>>>>>>>>> Also, I haven't provided any custom serializer in my flink job.
>>>>>>>>> Shouldn't the same configuration work for queryable state client?
>>>>>>>>>
>>>>>>>>> Jayant Ameta
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <wittyam...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Gordon,
>>>>>>>>>> Following is the stack trace that I'm getting:
>>>>>>>>>>
>>>>>>>>>> *Exception in thread "main"
>>>>>>>>>> java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
>>>>>>>>>> Failed
>>>>>>>>>> request 0.*
>>>>>>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.*
>>>>>>>>>> * Caused by: java.lang.RuntimeException: Error while processing
>>>>>>>>>> request with ID 0. Caused by: 
>>>>>>>>>> com.esotericsoftware.kryo.KryoException:
>>>>>>>>>> Encountered unregistered class ID: -985346241*
>>>>>>>>>> *Serialization trace:*
>>>>>>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
>>>>>>>>>> * at
>>>>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
>>>>>>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
>>>>>>>>>> * at
>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
>>>>>>>>>> * at
>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
>>>>>>>>>> * at
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
>>>>>>>>>> * at
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
>>>>>>>>>> * at
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
>>>>>>>>>> * at
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
>>>>>>>>>> * at
>>>>>>>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
>>>>>>>>>> * at
>>>>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
>>>>>>>>>> * at
>>>>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
>>>>>>>>>> * at
>>>>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
>>>>>>>>>> * at
>>>>>>>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
>>>>>>>>>> * at
>>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
>>>>>>>>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>>>>>>>>>> * at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>>>>>>>>>> * at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>>>>>>>>>> * at java.lang.Thread.run(Thread.java:748)*
>>>>>>>>>>
>>>>>>>>>> I am not using any custom serialize as mentioned by Jiayi.
>>>>>>>>>>
>>>>>>>>>> Jayant Ameta
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bupt_...@163.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi  Jayant,
>>>>>>>>>>>
>>>>>>>>>>>   There should be a Serializer parameter in the constructor of
>>>>>>>>>>> the StateDescriptor, you should create a new serializer like this:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>    new GenericTypeInfo(classOf[UUID]).createSerializer(env
>>>>>>>>>>> .getConfig)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  By the way, can you show us your kryo exception like what
>>>>>>>>>>> Gordon said?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Jiayi Liao, Best
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  Original Message
>>>>>>>>>>> *Sender:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>
>>>>>>>>>>> *Recipient:* Jayant Ameta<wittyam...@gmail.com>; bupt_ljy<
>>>>>>>>>>> bupt_...@163.com>
>>>>>>>>>>> *Cc:* user<user@flink.apache.org>
>>>>>>>>>>> *Date:* Thursday, Oct 25, 2018 17:18
>>>>>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro
>>>>>>>>>>> Exception
>>>>>>>>>>>
>>>>>>>>>>> Hi Jayant,
>>>>>>>>>>>
>>>>>>>>>>> What is the Kryo exception message that you are getting?
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Gordon
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (
>>>>>>>>>>> wittyam...@gmail.com) wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>> I've not configured any serializer in the descriptor. (Neither
>>>>>>>>>>> in flink job, nor in state query client).
>>>>>>>>>>> Which serializer should I use?
>>>>>>>>>>>
>>>>>>>>>>> Jayant Ameta
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bupt_...@163.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>>    It seems that your codes are right. Are you sure that you’re
>>>>>>>>>>>> using the same Serializer as the Flink program do? Could you show 
>>>>>>>>>>>> the
>>>>>>>>>>>> serializer in descriptor?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Jiayi Liao, Best
>>>>>>>>>>>>
>>>>>>>>>>>>  Original Message
>>>>>>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com>
>>>>>>>>>>>> *Recipient:* user<user@flink.apache.org>
>>>>>>>>>>>> *Date:* Thursday, Oct 25, 2018 14:17
>>>>>>>>>>>> *Subject:* Queryable state when key is UUID - getting Kyro
>>>>>>>>>>>> Exception
>>>>>>>>>>>>
>>>>>>>>>>>> I get Kyro exception when querying the state.
>>>>>>>>>>>>
>>>>>>>>>>>> Key: UUID
>>>>>>>>>>>> MapState<UUID, String>
>>>>>>>>>>>>
>>>>>>>>>>>> Client code snippet:
>>>>>>>>>>>>
>>>>>>>>>>>> CompletableFuture<MapState<UUID, String>> resultFuture =
>>>>>>>>>>>>     
>>>>>>>>>>>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"),
>>>>>>>>>>>>  "rule",
>>>>>>>>>>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>>>>>>>>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>>>>>>>>>>> MapState<UUID, String> mapState = resultFuture.get(10, 
>>>>>>>>>>>> TimeUnit.SECONDS);
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Any better way to query it?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Jayant Ameta
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to