[ https://issues.apache.org/jira/browse/FLINK-9336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472121#comment-16472121 ]
ASF GitHub Bot commented on FLINK-9336: --------------------------------------- GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/5993 [FLINK-9336][state] fix deserialization problem for queryable MapState ## What is the purpose of the change This PR fixes the deserialization problem for queryable MapState. Currently MapState use the `userKeyOffset` to skip the userKeyBytes when doing deserialization, for queryable state this will bring problem, this PR keeps the `userKeyOffset` in every `RocksDBMapEntry` to avoid the deserialization problem for queryable MapState. ## Brief change log - *Keep the `userKeyOffset` in every `RocksDBMapEntry` to avoid deserialization problem in queryable state.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink FLINK-9336 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5993.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5993 ---- commit 7b8443cd3d7f405bed97f00e1542a1004aef9ff8 Author: sihuazhou <summerleafs@...> Date: 2018-05-11T15:15:07Z keep the userKeyOffset in every MapEntry to avoid deserialization problem in queryable state. ---- > Queryable state fails with Exception after task manager restore > --------------------------------------------------------------- > > Key: FLINK-9336 > URL: https://issues.apache.org/jira/browse/FLINK-9336 > Project: Flink > Issue Type: Bug > Components: Queryable State > Affects Versions: 1.5.0 > Reporter: Florian Schmidt > Assignee: Sihua Zhou > Priority: Blocker > > > The following steps can be used to reproduce the Exception: > # Run an app with a FlatMap which exposes its MapState through Queryable > State with RocksDB > # Run a queryable state client: ✔️ > # Kill the TM > # Start a new TM > # Run a queryable state client: ❌ (StackTrace below) > > This happens if we run our queryable state client after the new TM has > started and the app is running but *before the FlatMap has processed elements > again* > > With a little help and some debugging I found out that very likely the reason > is that in the _RocksDBMapState_ there is a private field _userKeyOffset,_ > which is initialised through the code path of > _RocksDBMapState::serializeCurrentKeyAndNamespace_. This will only be called > when processing an element and therefore accessing the state. If now the > state is accessed before that through queryable state, this value will be 0 > and therefore the deserialization of the user key will fail as seen below. > > The observed stacktrace > > {code:java} > Exception in thread "main" java.util.concurrent.ExecutionException: > java.lang.RuntimeException: Failed request 0. Caused by: > java.lang.RuntimeException: Failed request 0. Caused by: > java.lang.RuntimeException: Error while processing request with ID 0. Caused > by: java.lang.RuntimeException: Error while deserializing the user key. at > org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414) > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at > java.io.DataInputStream.readFully(DataInputStream.java:197) at > java.io.DataInputStream.readUTF(DataInputStream.java:609) at > java.io.DataInputStream.readUTF(DataInputStream.java:564) at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412) > ... 11 more at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at > org.apache.flink.streaming.tests.queryablestate.QsStateClient.getMapState(QsStateClient.java:122) > at > org.apache.flink.streaming.tests.queryablestate.QsStateClient.main(QsStateClient.java:75) > Caused by: java.lang.RuntimeException: Failed request 0. Caused by: > java.lang.RuntimeException: Failed request 0. Caused by: > java.lang.RuntimeException: Error while processing request with ID 0. Caused > by: java.lang.RuntimeException: Error while deserializing the user key. at > org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:414) > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.serializeMap(KvStateSerializer.java:220) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState.getSerializedValue(RocksDBMapState.java:288) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:107) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:84) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at > java.io.DataInputStream.readFully(DataInputStream.java:197) at > java.io.DataInputStream.readUTF(DataInputStream.java:609) at > java.io.DataInputStream.readUTF(DataInputStream.java:564) at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:381) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserKey(RocksDBMapState.java:341) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState.access$200(RocksDBMapState.java:61) > at > org.apache.flink.contrib.streaming.state.RocksDBMapState$RocksDBMapEntry.getKey(RocksDBMapState.java:412) > ... 11 more at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:95) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:273) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)