[ 
https://issues.apache.org/jira/browse/FLINK-9336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472273#comment-16472273
 ] 

ASF GitHub Bot commented on FLINK-9336:
---------------------------------------

Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/5993
  
    Thanks for the work @sihuazhou and for reporting this @florianschmidt1994 ! 
This fixes the problem described in the JIRA but I will look at the PR a bit 
more thoroughly on Monday at the latest and then merge it if everything is ok.


> Queryable state fails with Exception after task manager restore
> ---------------------------------------------------------------
>
>                 Key: FLINK-9336
>                 URL: https://issues.apache.org/jira/browse/FLINK-9336
>             Project: Flink
>          Issue Type: Bug
>          Components: Queryable State
>    Affects Versions: 1.5.0
>            Reporter: Florian Schmidt
>            Assignee: Sihua Zhou
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
>  
> The following steps can be used to reproduce the Exception:
>  # Run an app with a FlatMap which exposes its MapState through Queryable 
> State with RocksDB
>  # Run a queryable state client: ✔️
>  # Kill the TM
>  # Start a new TM
>  # Run a queryable state client: ❌ (StackTrace below)
>  
> This happens if we run our queryable state client after the new TM has 
> started and the app is running but *before the FlatMap has processed elements 
> again*
>  
> With a little help and some debugging I found out that very likely the reason 
> is that in the _RocksDBMapState_ there is a private field _userKeyOffset,_ 
> which is initialised through the code path of 
> _RocksDBMapState::serializeCurrentKeyAndNamespace_. This will only be called 
> when processing an element and therefore accessing the state. If now the 
> state is accessed before that through queryable state, this value will be 0 
> and therefore the deserialization of the user key will fail as seen below. 
>  
> The observed stacktrace
>  
> {code:java}
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Failed request 0. Caused by: 
> java.lang.RuntimeException: Failed request 0. Caused by: 
> java.lang.RuntimeException: Error while processing request with ID 0. Caused 
> by: java.lang.RuntimeException: Error while deserializing the user key. at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414)
>  at 
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at 
> java.io.DataInputStream.readFully(DataInputStream.java:197) at 
> java.io.DataInputStream.readUTF(DataInputStream.java:609) at 
> java.io.DataInputStream.readUTF(DataInputStream.java:564) at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412)
>  ... 11 more at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
>  at 
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at 
> org.apache.flink.streaming.tests.queryablestate.QsStateClient.getMapState(QsStateClient.java:122)
>  at 
> org.apache.flink.streaming.tests.queryablestate.QsStateClient.main(QsStateClient.java:75)
>  Caused by: java.lang.RuntimeException: Failed request 0. Caused by: 
> java.lang.RuntimeException: Failed request 0. Caused by: 
> java.lang.RuntimeException: Error while processing request with ID 0. Caused 
> by: java.lang.RuntimeException: Error while deserializing the user key. at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414)
>  at 
> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at 
> java.io.DataInputStream.readFully(DataInputStream.java:197) at 
> java.io.DataInputStream.readUTF(DataInputStream.java:609) at 
> java.io.DataInputStream.readUTF(DataInputStream.java:564) at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61)
>  at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412)
>  ... 11 more at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95)
>  at 
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778)
>  at 
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748) at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:748)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to