[ 
https://issues.apache.org/jira/browse/FLINK-9336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Florian Schmidt updated FLINK-9336:
-----------------------------------
    Description: 
 

The following steps can be used to reproduce the Exception:
 # Run an app with a FlatMap which exposes its MapState through Queryable State 
with RocksDB
 # Run a queryable state client: ✔️
 # Kill the TM
 # Start a new TM
 # Run a queryable state client: ❌ (StackTrace below)

 

This happens if we run our queryable state client after the new TM has started 
and the app is running but *before the FlatMap has processed elements again*

 

With a little help and some debugging I found out that very likely the reason 
is that in the _RocksDBMapState_ there is a private field _userKeyOffset,_ 
**which is initialised through the code path of 
_RocksDBMapState::serializeCurrentKeyAndNamespace.* *_This will only be called 
when processing an element and therefore accessing the state. If now the state 
is accessed before that through queryable state, this value will be 0 and 
therefore the deserialization of the user key will fail as seen below. 

 

The observed stacktrace

 
{code:java}
Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: Failed request 0. Caused by: 
java.lang.RuntimeException: Failed request 0. Caused by: 
java.lang.RuntimeException: Error while processing request with ID 0. Caused 
by: java.lang.RuntimeException: Error while deserializing the user key. at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414)
 at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at 
java.io.DataInputStream.readFully(DataInputStream.java:197) at 
java.io.DataInputStream.readUTF(DataInputStream.java:609) at 
java.io.DataInputStream.readUTF(DataInputStream.java:564) at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412)
 ... 11 more at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
 at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
org.apache.flink.streaming.tests.queryablestate.QsStateClient.getMapState(QsStateClient.java:122)
 at 
org.apache.flink.streaming.tests.queryablestate.QsStateClient.main(QsStateClient.java:75)
 Caused by: java.lang.RuntimeException: Failed request 0. Caused by: 
java.lang.RuntimeException: Failed request 0. Caused by: 
java.lang.RuntimeException: Error while processing request with ID 0. Caused 
by: java.lang.RuntimeException: Error while deserializing the user key. at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414)
 at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at 
java.io.DataInputStream.readFully(DataInputStream.java:197) at 
java.io.DataInputStream.readUTF(DataInputStream.java:609) at 
java.io.DataInputStream.readUTF(DataInputStream.java:564) at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412)
 ... 11 more at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
 at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748)
{code}
 

  was:
 

The following example can be used to reproduce the Exception:
 # Run an app with a FlatMap which exposes its MapState through Queryable State 
with RocksDB
 # Run a queryable state client: ✔️
 # Kill the TM
 # Start a new TM
 # Run a queryable state client: ❌ (StackTrace below)

 

This happens if we run our queryable state client after the new TM has started 
and the app is running but *before the FlatMap has processed elements again*

 

With a little help and some debugging I found out that very likely the reason 
is that in the _RocksDBMapState_ there is a private field _userKeyOffset,_ 
**which is initialised through the code path of 
_RocksDBMapState::serializeCurrentKeyAndNamespace. **_This will only be called 
when processing an element and therefore accessing the state. If now the state 
is accessed before that through queryable state, this value will be 0 and 
therefore the deserialization of the user key will fail as seen below. 

 

The observed stacktrace

 
{code:java}
Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: Failed request 0. Caused by: 
java.lang.RuntimeException: Failed request 0. Caused by: 
java.lang.RuntimeException: Error while processing request with ID 0. Caused 
by: java.lang.RuntimeException: Error while deserializing the user key. at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414)
 at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at 
java.io.DataInputStream.readFully(DataInputStream.java:197) at 
java.io.DataInputStream.readUTF(DataInputStream.java:609) at 
java.io.DataInputStream.readUTF(DataInputStream.java:564) at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412)
 ... 11 more at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
 at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
org.apache.flink.streaming.tests.queryablestate.QsStateClient.getMapState(QsStateClient.java:122)
 at 
org.apache.flink.streaming.tests.queryablestate.QsStateClient.main(QsStateClient.java:75)
 Caused by: java.lang.RuntimeException: Failed request 0. Caused by: 
java.lang.RuntimeException: Failed request 0. Caused by: 
java.lang.RuntimeException: Error while processing request with ID 0. Caused 
by: java.lang.RuntimeException: Error while deserializing the user key. at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414)
 at 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at 
java.io.DataInputStream.readFully(DataInputStream.java:197) at 
java.io.DataInputStream.readUTF(DataInputStream.java:609) at 
java.io.DataInputStream.readUTF(DataInputStream.java:564) at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61)
 at 
org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412)
 ... 11 more at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
 at 
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
 at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
 at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) at 
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 at 
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748)
{code}
 


> Queryable state fails with Exception after task manager restore
> ---------------------------------------------------------------
>
>                 Key: FLINK-9336
>                 URL: https://issues.apache.org/jira/browse/FLINK-9336
>             Project: Flink
>          Issue Type: Bug
>          Components: Queryable State
>    Affects Versions: 1.5.0
>            Reporter: Florian Schmidt
>            Priority: Major
>
>  
> The following steps can be used to reproduce the Exception:
>  # Run an app with a FlatMap which exposes its MapState through Queryable 
> State with RocksDB
>  # Run a queryable state client: ✔️
>  # Kill the TM
>  # Start a new TM
>  # Run a queryable state client: ❌ (StackTrace below)
>  
> This happens if we run our queryable state client after the new TM has 
> started and the app is running but *before the FlatMap has processed elements 
> again*
>  
> With a little help and some debugging I found out that very likely the reason 
> is that in the _RocksDBMapState_ there is a private field _userKeyOffset,_ 
> **which is initialised through the code path of 
> _RocksDBMapState::serializeCurrentKeyAndNamespace.* *_This will only be 
> called when processing an element and therefore accessing the state. If now 
> the state is accessed before that through queryable state, this value will be 
> 0 and therefore the deserialization of the user key will fail as seen below. 
>  
> The observed stacktrace
>  
> {code:java}
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Failed request 0. Caused by: 
> java.lang.RuntimeException: Failed request 0. Caused by: 
> java.lang.RuntimeException: Error while processing request with ID 0. Caused 
> by: java.lang.RuntimeException: Error while deserializing the user key. at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414)
>  at 
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at 
> java.io.DataInputStream.readFully(DataInputStream.java:197) at 
> java.io.DataInputStream.readUTF(DataInputStream.java:609) at 
> java.io.DataInputStream.readUTF(DataInputStream.java:564) at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412)
>  ... 11 more at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
>  at 
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
> org.apache.flink.streaming.tests.queryablestate.QsStateClient.getMapState(QsStateClient.java:122)
>  at 
> org.apache.flink.streaming.tests.queryablestate.QsStateClient.main(QsStateClient.java:75)
>  Caused by: java.lang.RuntimeException: Failed request 0. Caused by: 
> java.lang.RuntimeException: Failed request 0. Caused by: 
> java.lang.RuntimeException: Error while processing request with ID 0. Caused 
> by: java.lang.RuntimeException: Error while deserializing the user key. at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414)
>  at 
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at 
> java.io.DataInputStream.readFully(DataInputStream.java:197) at 
> java.io.DataInputStream.readUTF(DataInputStream.java:609) at 
> java.io.DataInputStream.readUTF(DataInputStream.java:564) at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412)
>  ... 11 more at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
>  at 
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to