[ https://issues.apache.org/jira/browse/FLINK-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sihua Zhou reassigned FLINK-9060: --------------------------------- Assignee: Sihua Zhou > Deleting state using KeyedStateBackend.getKeys() throws Exception > ----------------------------------------------------------------- > > Key: FLINK-9060 > URL: https://issues.apache.org/jira/browse/FLINK-9060 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Reporter: Aljoscha Krettek > Assignee: Sihua Zhou > Priority: Blocker > Fix For: 1.5.0 > > > Adding this test to {{StateBackendTestBase}} showcases the problem: > {code} > @Test > public void testConcurrentModificationWithGetKeys() throws Exception { > AbstractKeyedStateBackend<Integer> backend = > createKeyedBackend(IntSerializer.INSTANCE); > try { > ListStateDescriptor<String> listStateDescriptor = > new ListStateDescriptor<>("foo", > StringSerializer.INSTANCE); > backend.setCurrentKey(1); > backend > .getPartitionedState(VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, listStateDescriptor) > .add("Hello"); > backend.setCurrentKey(2); > backend > .getPartitionedState(VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, listStateDescriptor) > .add("Ciao"); > Stream<Integer> keys = backend > .getKeys(listStateDescriptor.getName(), > VoidNamespace.INSTANCE); > keys.forEach((key) -> { > backend.setCurrentKey(key); > try { > backend > .getPartitionedState( > VoidNamespace.INSTANCE, > > VoidNamespaceSerializer.INSTANCE, > listStateDescriptor) > .clear(); > } catch (Exception e) { > e.printStackTrace(); > } > }); > } > finally { > IOUtils.closeQuietly(backend); > backend.dispose(); > } > } > {code} > This should work because one of the use cases of {{getKeys()}} and > {{applyToAllKeys()}} is to do stuff for every key, which includes deleting > them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)