Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169946803 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend( this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); + this.namespaceOutputStream = new ByteArrayOutputStreamWithPos(8); LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } @Override - public <N> Stream<K> getKeys(String state, N namespace) { + public <N> Stream<K> getKeys(String state, N namespace, TypeSerializer<N> namespaceSerializer) { Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(state); if (columnInfo == null) { return Stream.empty(); } - RocksIterator iterator = db.newIterator(columnInfo.f0); - iterator.seekToFirst(); + RocksIterator iterator = null; + try { + iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + boolean ambiguousKeyPossible = AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer); + final byte[] nameSpaceBytes; - Iterable<K> iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); - Stream<K> targetStream = StreamSupport.stream(iterable.spliterator(), false); - return targetStream.onClose(iterator::close); + try { + namespaceOutputStream.reset(); + AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace( + namespace, + namespaceSerializer, + namespaceOutputStream, + new DataOutputViewStreamWrapper(namespaceOutputStream), + ambiguousKeyPossible); + nameSpaceBytes = namespaceOutputStream.toByteArray(); + } catch (IOException ex) { + throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); + } + + final RocksIteratorWrapper<K> iteratorWrapper = new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, + ambiguousKeyPossible, nameSpaceBytes); + + Stream<K> targetStream = StreamSupport.stream(((Iterable<K>)()->iteratorWrapper).spliterator(), false); + return targetStream.onClose(() -> { + try { + iteratorWrapper.close(); + } catch (Exception ex) { + LOG.warn("Release RocksIteratorWrapper failed.", ex); + } + }); + } catch (Exception ex) { --- End diff -- As mentioned in my previous comment, we can solve this without any `try-catch` here, just create the native iterator further down, where no more exception can happen, i.e. ``` (...) RocksIterator iterator = db.newIterator(columnInfo.f0); iterator.seekToFirst(); final RocksIteratorWrapper<K> iteratorWrapper = new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, ambiguousKeyPossible, nameSpaceBytes); Stream<K> targetStream = StreamSupport.stream(((Iterable<K>) () -> iteratorWrapper).spliterator(), false); return targetStream.onClose(iteratorWrapper::close); ```
---