Hi Till, Here is the client snippet. Here Rule is a custom POJO that I use.
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb"); QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069); ExecutionConfig config = new ExecutionConfig(); client.setExecutionConfig(config); MapStateDescriptor<UUID, Rule> descriptor = new MapStateDescriptor<>("rulePatterns", UUID.class, Rule.class); CompletableFuture<MapState<UUID, Rule>> resultFuture = client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), "rules", uuid, TypeInformation.of(UUID.class), descriptor); while (!resultFuture.isDone()) { Thread.sleep(1000); } resultFuture.whenComplete((result, throwable) -> { if (throwable != null) { throwable.printStackTrace(); } else { try { System.out.println(result.get(uuid)); } catch (Exception e) { e.printStackTrace(); } } }); } Below is the stack trace: Caused by: java.lang.RuntimeException: Error while processing request with ID 12. Caused by: java.io.IOException: Unable to deserialize key and namespace. This indicates a mismatch in the key/namespace serializers used by the KvState instance and this access. at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107) at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307) at org.apache.flink.types.StringValue.readString(StringValue.java:770) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94) ... 9 more at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563) at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:748) Jayant Ameta On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <trohrm...@apache.org> wrote: > Could you send us a small example program which we can use to reproduce > the problem? > > Cheers, > Till > > On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <wittyam...@gmail.com> wrote: > >> Yeah, it IS using Kryo serializer. >> >> Jayant Ameta >> >> >> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Jayant, could you check that the UUID key on the TM is actually >>> serialized using a Kryo serializer? You can do this by setting a breakpoint >>> in the constructor of the `AbstractKeyedStateBackend`. >>> >>> Cheers, >>> Till >>> >>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <bupt_...@163.com> wrote: >>> >>>> Hi, Jayant >>>> >>>> Your code looks good to me. And I’ve tried the >>>> serialize/deserialize of Kryo on UUID class, it all looks okay. >>>> >>>> I’m not very sure about this problem. Maybe you can write a very >>>> simple demo to try if it works. >>>> >>>> >>>> Jiayi Liao, Best >>>> >>>> Original Message >>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>> *Recipient:* bupt_ljy<bupt_...@163.com> >>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user< >>>> user@flink.apache.org> >>>> *Date:* Monday, Oct 29, 2018 11:53 >>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>> Exception >>>> >>>> Hi Jiayi, >>>> Any further help on this? >>>> >>>> Jayant Ameta >>>> >>>> >>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <wittyam...@gmail.com> >>>> wrote: >>>> >>>>> MapStateDescriptor<UUID, String> descriptor = new >>>>> MapStateDescriptor<>("rulePatterns", UUID.class, >>>>> String.class); >>>>> >>>>> Jayant Ameta >>>>> >>>>> >>>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <bupt_...@163.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Can you show us the descriptor in the codes below? >>>>>> >>>>>> client.getKvState(JobID.fromHexString( >>>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule", >>>>>> >>>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"), >>>>>>> TypeInformation.of(new TypeHint<UUID>() {}), descriptor); >>>>>>> >>>>>>> >>>>>> Jiayi Liao, Best >>>>>> >>>>>> >>>>>> Original Message >>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>> *Recipient:* bupt_ljy<bupt_...@163.com> >>>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user< >>>>>> user@flink.apache.org> >>>>>> *Date:* Friday, Oct 26, 2018 02:26 >>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>> Exception >>>>>> >>>>>> Also, I haven't provided any custom serializer in my flink job. >>>>>> Shouldn't the same configuration work for queryable state client? >>>>>> >>>>>> Jayant Ameta >>>>>> >>>>>> >>>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <wittyam...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Gordon, >>>>>>> Following is the stack trace that I'm getting: >>>>>>> >>>>>>> *Exception in thread "main" java.util.concurrent.ExecutionException: >>>>>>> java.lang.RuntimeException: Failed request 0.* >>>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.* >>>>>>> * Caused by: java.lang.RuntimeException: Error while processing >>>>>>> request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: >>>>>>> Encountered unregistered class ID: -985346241* >>>>>>> *Serialization trace:* >>>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)* >>>>>>> * at >>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)* >>>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)* >>>>>>> * at >>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)* >>>>>>> * at >>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)* >>>>>>> * at >>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)* >>>>>>> * at >>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)* >>>>>>> * at >>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)* >>>>>>> * at >>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)* >>>>>>> * at >>>>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)* >>>>>>> * at >>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)* >>>>>>> * at >>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)* >>>>>>> * at >>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)* >>>>>>> * at >>>>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)* >>>>>>> * at >>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)* >>>>>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)* >>>>>>> * at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)* >>>>>>> * at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)* >>>>>>> * at java.lang.Thread.run(Thread.java:748)* >>>>>>> >>>>>>> I am not using any custom serialize as mentioned by Jiayi. >>>>>>> >>>>>>> Jayant Ameta >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bupt_...@163.com> wrote: >>>>>>> >>>>>>>> Hi Jayant, >>>>>>>> >>>>>>>> There should be a Serializer parameter in the constructor of the >>>>>>>> StateDescriptor, you should create a new serializer like this: >>>>>>>> >>>>>>>> >>>>>>>> new GenericTypeInfo(classOf[UUID]).createSerializer(env >>>>>>>> .getConfig) >>>>>>>> >>>>>>>> >>>>>>>> By the way, can you show us your kryo exception like what Gordon >>>>>>>> said? >>>>>>>> >>>>>>>> >>>>>>>> Jiayi Liao, Best >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Original Message >>>>>>>> *Sender:* Tzu-Li (Gordon) Tai<tzuli...@apache.org> >>>>>>>> *Recipient:* Jayant Ameta<wittyam...@gmail.com>; bupt_ljy< >>>>>>>> bupt_...@163.com> >>>>>>>> *Cc:* user<user@flink.apache.org> >>>>>>>> *Date:* Thursday, Oct 25, 2018 17:18 >>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>>>> Exception >>>>>>>> >>>>>>>> Hi Jayant, >>>>>>>> >>>>>>>> What is the Kryo exception message that you are getting? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Gordon >>>>>>>> >>>>>>>> >>>>>>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta ( >>>>>>>> wittyam...@gmail.com) wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> I've not configured any serializer in the descriptor. (Neither in >>>>>>>> flink job, nor in state query client). >>>>>>>> Which serializer should I use? >>>>>>>> >>>>>>>> Jayant Ameta >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bupt_...@163.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> It seems that your codes are right. Are you sure that you’re >>>>>>>>> using the same Serializer as the Flink program do? Could you show the >>>>>>>>> serializer in descriptor? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Jiayi Liao, Best >>>>>>>>> >>>>>>>>> Original Message >>>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>>>>> *Recipient:* user<user@flink.apache.org> >>>>>>>>> *Date:* Thursday, Oct 25, 2018 14:17 >>>>>>>>> *Subject:* Queryable state when key is UUID - getting Kyro >>>>>>>>> Exception >>>>>>>>> >>>>>>>>> I get Kyro exception when querying the state. >>>>>>>>> >>>>>>>>> Key: UUID >>>>>>>>> MapState<UUID, String> >>>>>>>>> >>>>>>>>> Client code snippet: >>>>>>>>> >>>>>>>>> CompletableFuture<MapState<UUID, String>> resultFuture = >>>>>>>>> >>>>>>>>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), >>>>>>>>> "rule", >>>>>>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"), >>>>>>>>> TypeInformation.of(new TypeHint<UUID>() {}), descriptor); >>>>>>>>> MapState<UUID, String> mapState = resultFuture.get(10, >>>>>>>>> TimeUnit.SECONDS); >>>>>>>>> >>>>>>>>> >>>>>>>>> Any better way to query it? >>>>>>>>> >>>>>>>>> >>>>>>>>> Jayant Ameta >>>>>>>>> >>>>>>>>