[ https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sihua Zhou reassigned FLINK-9804: --------------------------------- Assignee: Sihua Zhou (was: vinoyang) > KeyedStateBackend.getKeys() does not work on RocksDB MapState > ------------------------------------------------------------- > > Key: FLINK-9804 > URL: https://issues.apache.org/jira/browse/FLINK-9804 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.5.0, 1.5.1 > Reporter: Aljoscha Krettek > Assignee: Sihua Zhou > Priority: Blocker > Fix For: 1.5.2, 1.6.0 > > > This can be reproduced by adding this test to {{StateBackendTestBase}}: > {code} > @Test > public void testMapStateGetKeys() throws Exception { > final int namespace1ElementsNum = 1000; > final int namespace2ElementsNum = 1000; > String fieldName = "get-keys-test"; > AbstractKeyedStateBackend<Integer> backend = > createKeyedBackend(IntSerializer.INSTANCE); > try { > final String ns1 = "ns1"; > MapState<String, Integer> keyedState1 = > backend.getPartitionedState( > ns1, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = 0; key < namespace1ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState1.put("he", key * 2); > keyedState1.put("ho", key * 2); > } > final String ns2 = "ns2"; > MapState<String, Integer> keyedState2 = > backend.getPartitionedState( > ns2, > StringSerializer.INSTANCE, > new MapStateDescriptor<>(fieldName, > StringSerializer.INSTANCE, IntSerializer.INSTANCE) > ); > for (int key = namespace1ElementsNum; key < > namespace1ElementsNum + namespace2ElementsNum; key++) { > backend.setCurrentKey(key); > keyedState2.put("he", key * 2); > keyedState2.put("ho", key * 2); > } > // valid for namespace1 > try (Stream<Integer> keysStream = backend.getKeys(fieldName, > ns1).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = 0; expectedKey < > namespace1ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > // valid for namespace2 > try (Stream<Integer> keysStream = backend.getKeys(fieldName, > ns2).sorted()) { > PrimitiveIterator.OfInt actualIterator = > keysStream.mapToInt(value -> value.intValue()).iterator(); > for (int expectedKey = namespace1ElementsNum; > expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) { > assertTrue(actualIterator.hasNext()); > assertEquals(expectedKey, > actualIterator.nextInt()); > } > assertFalse(actualIterator.hasNext()); > } > } > finally { > IOUtils.closeQuietly(backend); > backend.dispose(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)