Hi, Jayant
 The key you specified in getKvState function should be the key of the keyed 
stream instead of the key of the map. From what I’ve seen 
onhttps://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html,
 this feature only supports managed keyed state.
 By the way, I think we should optimize the error messages with which what 
Jayant met.


Best,
Jiayi Liao


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:trohrmanntrohrm...@apache.org
Cc:bupt_ljybupt_...@163.com; Tzu-Li (Gordon) taitzuli...@apache.org; 
useru...@flink.apache.org
Date:Tuesday, Nov 13, 2018 13:39
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Hi Till,
Here is the client snippet. Here Rule is a custom POJO that I use.


public static void main(String[] args)
 throws IOException, InterruptedException, ExecutionException {
 UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb");

 QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069);
 ExecutionConfig config = new ExecutionConfig();
 client.setExecutionConfig(config);

 MapStateDescriptorUUID, Rule descriptor = new 
MapStateDescriptor("rulePatterns", UUID.class,
 Rule.class);
 CompletableFutureMapStateUUID, Rule resultFuture =
 client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), 
"rules",
 uuid, TypeInformation.of(UUID.class), descriptor);

 while (!resultFuture.isDone()) {
 Thread.sleep(1000);
 }
 resultFuture.whenComplete((result, throwable) - {
 if (throwable != null) {
 throwable.printStackTrace();
 } else {
 try {
 System.out.println(result.get(uuid));
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 });
}


Below is the stack trace:


Caused by: java.lang.RuntimeException: Error while processing request with ID 
12. Caused by: java.io.IOException: Unable to deserialize key and namespace. 
This indicates a mismatch in the key/namespace serializers used by the KvState 
instance and this access.
at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107)
at 
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
at 
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307)
at org.apache.flink.types.StringValue.readString(StringValue.java:770)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
... 9 more


at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563)
at 
org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:748)


Jayant Ameta





On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann trohrm...@apache.org wrote:

Could you send us a small example program which we can use to reproduce the 
problem?


Cheers,
Till


On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta wittyam...@gmail.com wrote:

Yeah, it IS using Kryo serializer.


Jayant Ameta





On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann trohrm...@apache.org wrote:

Hi Jayant, could you check that the UUID key on the TM is actually serialized 
using a Kryo serializer? You can do this by setting a breakpoint in the 
constructor of the `AbstractKeyedStateBackend`.


Cheers,
Till


On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy bupt_...@163.com wrote:

Hi, Jayant
  Your code looks good to me. And I’ve tried the serialize/deserialize of Kryo 
on UUID class, it all looks okay.
  I’m not very sure about this problem. Maybe you can write a very simple demo 
to try if it works.


Jiayi Liao, Best


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:Tzu-Li (Gordon) taitzuli...@apache.org; useru...@flink.apache.org
Date:Monday, Oct 29, 2018 11:53
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Hi Jiayi,
Any further help on this?


Jayant Ameta





On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta wittyam...@gmail.com wrote:

MapStateDescriptorUUID, String descriptor = new 
MapStateDescriptor("rulePatterns", UUID.class,
 String.class);
Jayant Ameta





On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy bupt_...@163.com wrote:

Hi,
 Can you show us the descriptor in the codes below?
  client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHintUUID() {}), descriptor);


Jiayi Liao, Best




Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:bupt_ljybupt_...@163.com
Cc:Tzu-Li (Gordon) taitzuli...@apache.org; useru...@flink.apache.org
Date:Friday, Oct 26, 2018 02:26
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Also, I haven't provided any custom serializer in my flink job. Shouldn't the 
same configuration work for queryable state client?


Jayant Ameta





On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta wittyam...@gmail.com wrote:

Hi Gordon,
Following is the stack trace that I'm getting:


Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: Failed request 0.
Caused by: java.lang.RuntimeException: Failed request 0.
Caused by: java.lang.RuntimeException: Error while processing request with ID 
0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: -985346241
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)
at 
org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)
at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)
at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


I am not using any custom serialize as mentioned by Jiayi.


Jayant Ameta





On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy bupt_...@163.com wrote:

Hi Jayant,
 There should be a Serializer parameter in the constructor of the 
StateDescriptor, you should create a new serializer like this:


 new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)


By the way, can you show us your kryo exception like what Gordon said?


Jiayi Liao, Best




Original Message
Sender:Tzu-Li (Gordon) taitzuli...@apache.org
Recipient:Jayant ametawittyam...@gmail.com; bupt_ljybupt_...@163.com
Cc:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 17:18
Subject:Re: Queryable state when key is UUID - getting Kyro Exception


Hi Jayant,


What is the Kryo exception message that you are getting?


Cheers,
Gordon




On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com) wrote:
Hi,
I've not configured any serializer in the descriptor. (Neither in flink job, 
nor in state query client).
Which serializer should I use?


Jayant Ameta





On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy bupt_...@163.com wrote:

Hi,
 It seems that your codes are right. Are you sure that you’re using the same 
Serializer as the Flink program do? Could you show the serializer in descriptor?




Jiayi Liao, Best


Original Message
Sender:Jayant ametawittyam...@gmail.com
Recipient:useru...@flink.apache.org
Date:Thursday, Oct 25, 2018 14:17
Subject:Queryable state when key is UUID - getting Kyro Exception


I get Kyro exception when querying the state.


Key: UUID
MapStateUUID, String


Client code snippet:


CompletableFutureMapStateUUID, String resultFuture =
 client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
"rule",
 UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
 TypeInformation.of(new TypeHintUUID() {}), descriptor);
MapStateUUID, String mapState = resultFuture.get(10, TimeUnit.SECONDS);


Any better way to query it?




Jayant Ameta

Reply via email to