Also, I haven't provided any custom serializer in my flink job. Shouldn't
the same configuration work for queryable state client?

Jayant Ameta


On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <wittyam...@gmail.com> wrote:

> Hi Gordon,
> Following is the stack trace that I'm getting:
>
> *Exception in thread "main" java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Failed request 0.*
> * Caused by: java.lang.RuntimeException: Failed request 0.*
> * Caused by: java.lang.RuntimeException: Error while processing request
> with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: Encountered
> unregistered class ID: -985346241*
> *Serialization trace:*
> *$outer (scala.collection.convert.Wrappers$SeqWrapper)*
> * at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)*
> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)*
> * at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)*
> * at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)*
> * at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)*
> * at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)*
> * at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)*
> * at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)*
> * at
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)*
> * at
> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)*
> * at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)*
> * at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)*
> * at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)*
> * at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)*
> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
> * at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
> * at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
> * at java.lang.Thread.run(Thread.java:748)*
>
> I am not using any custom serialize as mentioned by Jiayi.
>
> Jayant Ameta
>
>
> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bupt_...@163.com> wrote:
>
>> Hi  Jayant,
>>
>>   There should be a Serializer parameter in the constructor of the
>> StateDescriptor, you should create a new serializer like this:
>>
>>
>>    new GenericTypeInfo(classOf[UUID]).createSerializer(env.getConfig)
>>
>>
>>  By the way, can you show us your kryo exception like what Gordon said?
>>
>>
>> Jiayi Liao, Best
>>
>>
>>
>>  Original Message
>> *Sender:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>
>> *Recipient:* Jayant Ameta<wittyam...@gmail.com>; bupt_ljy<
>> bupt_...@163.com>
>> *Cc:* user<user@flink.apache.org>
>> *Date:* Thursday, Oct 25, 2018 17:18
>> *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception
>>
>> Hi Jayant,
>>
>> What is the Kryo exception message that you are getting?
>>
>> Cheers,
>> Gordon
>>
>>
>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta (wittyam...@gmail.com)
>> wrote:
>>
>> Hi,
>> I've not configured any serializer in the descriptor. (Neither in flink
>> job, nor in state query client).
>> Which serializer should I use?
>>
>> Jayant Ameta
>>
>>
>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bupt_...@163.com> wrote:
>>
>>> Hi,
>>>
>>>    It seems that your codes are right. Are you sure that you’re using
>>> the same Serializer as the Flink program do? Could you show the serializer
>>> in descriptor?
>>>
>>>
>>>
>>> Jiayi Liao, Best
>>>
>>>  Original Message
>>> *Sender:* Jayant Ameta<wittyam...@gmail.com>
>>> *Recipient:* user<user@flink.apache.org>
>>> *Date:* Thursday, Oct 25, 2018 14:17
>>> *Subject:* Queryable state when key is UUID - getting Kyro Exception
>>>
>>> I get Kyro exception when querying the state.
>>>
>>> Key: UUID
>>> MapState<UUID, String>
>>>
>>> Client code snippet:
>>>
>>> CompletableFuture<MapState<UUID, String>> resultFuture =
>>>     
>>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), 
>>> "rule",
>>>         UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"),
>>>         TypeInformation.of(new TypeHint<UUID>() {}), descriptor);
>>> MapState<UUID, String> mapState = resultFuture.get(10, TimeUnit.SECONDS);
>>>
>>>
>>> Any better way to query it?
>>>
>>>
>>> Jayant Ameta
>>>
>>

Reply via email to