liming30 commented on code in PR #20405:
URL: https://github.com/apache/flink/pull/20405#discussion_r940948140


##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java:
##########
@@ -223,4 +223,27 @@ public StateIncrementalVisitor<K, N, V> 
getStateIncrementalVisitor(
         throw new UnsupportedOperationException(
                 "Global state entry iterator is unsupported for RocksDb 
backend");
     }
+
+    /**
+     * Similar to decimal addition, add 1 to the last digit to calculate the 
upper bound.
+     *
+     * @param prefix the starting prefix for seek.
+     * @return end prefix for seek.
+     */
+    protected final byte[] calculateUpperBound(byte[] prefix) {
+        byte[] upperBound = new byte[prefix.length];
+        System.arraycopy(prefix, 0, upperBound, 0, prefix.length);
+        boolean overFlow = true;
+        for (int i = prefix.length - 1; i >= 0; i--) {
+            int unsignedValue = prefix[i] & 0xff;
+            int result = unsignedValue + 1;
+            upperBound[i] = (byte) (result & 0xff);
+            if (result >> 8 == 0) {
+                overFlow = false;
+                break;
+            }
+        }
+        Preconditions.checkArgument(!overFlow, "The upper boundary 
overflows.");

Review Comment:
   Assuming the max parallelism is 128(so we always serialize the KeyGroup with 
1 byte.), the original parallelism is 1.  Then we get a DB instance of 
KeyGroupRange(0, 127), which is `currentKeyGroupRange`.
   
   When our parallelism is adjusted to 2, our new `KeyGroup` assignments are 
[0, 63] and [64, 127]. For `KeyGroupRange`(0, 63) we delete data belonging to 
`KeyGroup`(64,127). Since the `deleteRange` of `RocksDB` contains the left 
border and does not contain the right border, in order to completely delete the 
data less than or equal to `KeyGroup`(127), we add 1 to the `KeyGroup`, which 
makes us actually serialize 128, and the serialized result is 0x80, also 
produces overflow in the Java sense. 
   
   > CompositeKeySerializationUtils.serializeKeyGroup( 
**keyGroupRange.getEndKeyGroup() + 1**, stopKeyGroupPrefixBytes)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to