[ https://issues.apache.org/jira/browse/FLINK-9269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stefan Richter closed FLINK-9269. --------------------------------- Resolution: Fixed Merged in: master: 14e7d35f26 release-1.5: 3ba21adc0e > Concurrency problem in HeapKeyedStateBackend when performing checkpoint async > ----------------------------------------------------------------------------- > > Key: FLINK-9269 > URL: https://issues.apache.org/jira/browse/FLINK-9269 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.5.0 > Reporter: Sihua Zhou > Assignee: Sihua Zhou > Priority: Critical > Fix For: 1.6.0 > > > {code:java} > @Test > public void testConccurrencyProblem() throws Exception { > CheckpointStreamFactory streamFactory = createStreamFactory(); > Environment env = new DummyEnvironment(); > AbstractKeyedStateBackend<Integer> backend = > createKeyedBackend(IntSerializer.INSTANCE, env); > try { > long checkpointID = 0; > List<Future> futureList = new ArrayList(); > for (int i = 0; i < 10; ++i) { > ValueStateDescriptor<Integer> kvId = new > ValueStateDescriptor<>("id" + i, IntSerializer.INSTANCE); > ValueState<Integer> state = > backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId); > ((InternalValueState) > state).setCurrentNamespace(VoidNamespace.INSTANCE); > backend.setCurrentKey(i); > state.update(i); > > futureList.add(runSnapshotAsync(backend.snapshot(checkpointID++, > System.currentTimeMillis(), streamFactory, > CheckpointOptions.forCheckpointWithDefaultLocation()))); > } > for (Future future : futureList) { > future.get(); > } > } catch (Exception e) { > fail(); > } finally { > backend.dispose(); > } > } > protected Future<?> runSnapshotAsync( > RunnableFuture<SnapshotResult<KeyedStateHandle>> > snapshotRunnableFuture) throws Exception { > if (!snapshotRunnableFuture.isDone()) { > return Executors.newFixedThreadPool(5).submit(() -> { > try { > snapshotRunnableFuture.run(); > snapshotRunnableFuture.get(); > } catch (Exception e) { > e.printStackTrace(); > fail(); > } > }); > } > return null; > } > {code} > Place the above code in `StateBackendTestBase` and run > `AsyncMemoryStateBackendTest`, it will get the follows exception > {code} > java.util.concurrent.ExecutionException: java.lang.NullPointerException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662) > at > org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84) > ... 5 more > java.util.concurrent.ExecutionException: java.lang.NullPointerException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:85) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:716) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:662) > at > org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.runtime.state.AsyncMemoryStateBackendTest.lambda$runSnapshotAsync$0(AsyncMemoryStateBackendTest.java:84) > ... 5 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)