Hi Jiayi, Any further help on this? Jayant Ameta
On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <[email protected]> wrote: > MapStateDescriptor<UUID, String> descriptor = new > MapStateDescriptor<>("rulePatterns", UUID.class, > String.class); > > Jayant Ameta > > > On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <[email protected]> wrote: > >> Hi, >> >> Can you show us the descriptor in the codes below? >> >> client.getKvState(JobID.fromHexString( >> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule", >> >> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"), >>> TypeInformation.of(new TypeHint<UUID>() {}), descriptor); >>> >>> >> Jiayi Liao, Best >> >> >> Original Message >> *Sender:* Jayant Ameta<[email protected]> >> *Recipient:* bupt_ljy<[email protected]> >> *Cc:* Tzu-Li (Gordon) Tai<[email protected]>; user< >> [email protected]> >> *Date:* Friday, Oct 26, 2018 02:26 >> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception >> >> Also, I haven't provided any custom serializer in my flink job. Shouldn't >> the same configuration work for queryable state client? >> >> Jayant Ameta >> >> >> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <[email protected]> >> wrote: >> >>> Hi Gordon, >>> Following is the stack trace that I'm getting: >>> >>> *Exception in thread "main" java.util.concurrent.ExecutionException: >>> java.lang.RuntimeException: Failed request 0.* >>> * Caused by: java.lang.RuntimeException: Failed request 0.* >>> * Caused by: java.lang.RuntimeException: Error while processing request >>> with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered >>> unregistered class ID: -985346241* >>> *Serialization trace:* >>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)* >>> * at >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)* >>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)* >>> * at >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)* >>> * at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)* >>> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)* >>> * at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)* >>> * at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)* >>> * at >>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)* >>> * at >>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)* >>> * at >>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)* >>> * at >>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)* >>> * at >>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)* >>> * at >>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)* >>> * at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)* >>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)* >>> * at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)* >>> * at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)* >>> * at java.lang.Thread.run(Thread.java:748)* >>> >>> I am not using any custom serialize as mentioned by Jiayi. >>> >>> Jayant Ameta >>> >>> >>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <[email protected]> wrote: >>> >>>> Hi Jayant, >>>> >>>> There should be a Serializer parameter in the constructor of the >>>> StateDescriptor, you should create a new serializer like this: >>>> >>>> >>>> new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig) >>>> >>>> >>>> By the way, can you show us your kryo exception like what Gordon said? >>>> >>>> >>>> Jiayi Liao, Best >>>> >>>> >>>> >>>> Original Message >>>> *Sender:* Tzu-Li (Gordon) Tai<[email protected]> >>>> *Recipient:* Jayant Ameta<[email protected]>; bupt_ljy< >>>> [email protected]> >>>> *Cc:* user<[email protected]> >>>> *Date:* Thursday, Oct 25, 2018 17:18 >>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>> Exception >>>> >>>> Hi Jayant, >>>> >>>> What is the Kryo exception message that you are getting? >>>> >>>> Cheers, >>>> Gordon >>>> >>>> >>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta ([email protected]) >>>> wrote: >>>> >>>> Hi, >>>> I've not configured any serializer in the descriptor. (Neither in flink >>>> job, nor in state query client). >>>> Which serializer should I use? >>>> >>>> Jayant Ameta >>>> >>>> >>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> It seems that your codes are right. Are you sure that you’re using >>>>> the same Serializer as the Flink program do? Could you show the serializer >>>>> in descriptor? >>>>> >>>>> >>>>> >>>>> Jiayi Liao, Best >>>>> >>>>> Original Message >>>>> *Sender:* Jayant Ameta<[email protected]> >>>>> *Recipient:* user<[email protected]> >>>>> *Date:* Thursday, Oct 25, 2018 14:17 >>>>> *Subject:* Queryable state when key is UUID - getting Kyro Exception >>>>> >>>>> I get Kyro exception when querying the state. >>>>> >>>>> Key: UUID >>>>> MapState<UUID, String> >>>>> >>>>> Client code snippet: >>>>> >>>>> CompletableFuture<MapState<UUID, String>> resultFuture = >>>>> >>>>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), >>>>> "rule", >>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"), >>>>> TypeInformation.of(new TypeHint<UUID>() {}), descriptor); >>>>> MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS); >>>>> >>>>> >>>>> Any better way to query it? >>>>> >>>>> >>>>> Jayant Ameta >>>>> >>>>
