Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169950751 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -161,107 +160,131 @@ protected void writeKeyWithGroupAndNamespace( Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context."); keySerializationStream.reset(); - writeKeyGroup(keyGroup, keySerializationDataOutputView); - writeKey(key, keySerializationStream, keySerializationDataOutputView); - writeNameSpace(namespace, keySerializationStream, keySerializationDataOutputView); + AbstractRocksDBUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView); + AbstractRocksDBUtils.writeKey(key, backend.getKeySerializer(), keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); + AbstractRocksDBUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); } - private void writeKeyGroup( - int keyGroup, - DataOutputView keySerializationDateDataOutputView) throws IOException { - for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { - keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); - } + protected Tuple3<Integer, K, N> readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { + int keyGroup = AbstractRocksDBUtils.readKeyGroup(backend.getKeyGroupPrefixBytes(), inputView); + K key = AbstractRocksDBUtils.readKey(backend.getKeySerializer(), inputStream, inputView, ambiguousKeyPossible); + N namespace = AbstractRocksDBUtils.readNamespace(namespaceSerializer, inputStream, inputView, ambiguousKeyPossible); + + return new Tuple3<>(keyGroup, key, namespace); } - private void writeKey( - K key, - ByteArrayOutputStreamWithPos keySerializationStream, - DataOutputView keySerializationDataOutputView) throws IOException { - //write key - int beforeWrite = keySerializationStream.getPosition(); - backend.getKeySerializer().serialize(key, keySerializationDataOutputView); - - if (ambiguousKeyPossible) { - //write size of key - writeLengthFrom(beforeWrite, keySerializationStream, - keySerializationDataOutputView); + /** + * Utils for RocksDB state serialization and deserialization. + */ + static class AbstractRocksDBUtils { --- End diff -- The name of this class already suggests that it might better go to its own file, maybe as `RocksDBKeySerializationUtils`. Now that the methods are public and used in different places, I also suggest to have a test to guard their behavior against accidental code changes.
---