swamirishi commented on code in PR #8774: URL: https://github.com/apache/ozone/pull/8774#discussion_r2212732639
########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java: ########## @@ -141,39 +339,99 @@ private class FamilyCache { * When dbValue is a byte[]/{@link ByteBuffer}, it represents a put-op. * Otherwise, it represents a delete-op (dbValue is {@link Op#DELETE}). */ - private final Map<Bytes, Object> ops = new HashMap<>(); + private final Map<Bytes, Integer> opsKeys = new HashMap<>(); + private final Map<Integer, Operation> batchOps = new HashMap<>(); private boolean isCommit; private long batchSize; private long discardedSize; private int discardedCount; private int putCount; private int delCount; + private int delRangeCount; + private AtomicInteger opIndex; FamilyCache(ColumnFamily family) { this.family = family; + this.opIndex = new AtomicInteger(0); } - /** Prepare batch write for the entire family. */ + /** + * Prepares a batch write operation for a RocksDB-backed system. + * + * This method ensures the orderly execution of operations accumulated in the batch, + * respecting their respective types and order of insertion. + * + * Key functionalities: + * 1. Ensures that the batch is not already committed before proceeding. + * 2. Sorts all operations by their `opIndex` to maintain a consistent execution order. + * 3. Filters and adapts operations to account for any delete range operations that might + * affect other operations in the batch: + * - Operations with keys that fall within the range specified by a delete range operation + * are discarded. + * - Delete range operations are executed in their correct order. + * 4. Applies remaining operations to the write batch, ensuring proper filtering and execution. + * 5. Logs a summary of the batch execution for debugging purposes. + * + * Throws: + * - RocksDatabaseException if any error occurs while applying operations to the write batch. + * + * Prerequisites: + * - The method assumes that the operations are represented by `Operation` objects, each of which + * encapsulates the logic for its specific type. + * - Delete range operations must be represented by the `DeleteRangeOperation` class. + */ void prepareBatchWrite() throws RocksDatabaseException { Preconditions.checkState(!isCommit, "%s is already committed.", this); isCommit = true; - for (Map.Entry<Bytes, Object> op : ops.entrySet()) { - final Bytes key = op.getKey(); - final Object value = op.getValue(); - if (value instanceof byte[]) { - family.batchPut(writeBatch, key.array(), (byte[]) value); - } else if (value instanceof CodecBuffer) { - family.batchPut(writeBatch, key.asReadOnlyByteBuffer(), - ((CodecBuffer) value).asReadOnlyByteBuffer()); - } else if (value == Op.DELETE) { - family.batchDelete(writeBatch, key.array()); + // Sort Entries based on opIndex and flush the operation to the batch in the same order. + List<Operation> ops = batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)) + .map(Map.Entry::getValue).collect(Collectors.toList()); + List<Integer> deleteRangeIndices = new ArrayList<>(); + int index = 0; + for (Operation op : ops) { + if (Op.DELETE_RANGE == op.getOpType()) { + deleteRangeIndices.add(index); + } + index++; + } + // This is to apply the last batch of entries after the last DeleteRangeOperation. + deleteRangeIndices.add(ops.size()); + int startIndex = 0; + + for (Integer deleteRangeIndex : deleteRangeIndices) { + DeleteRangeOperation deleteRangeOp = deleteRangeIndex < ops.size() ? + (DeleteRangeOperation) ops.get(deleteRangeIndex) : null; + Bytes startKey, endKey; + if (deleteRangeOp != null) { + startKey = new Bytes(deleteRangeOp.startKey); + endKey = new Bytes(deleteRangeOp.endKey); } else { - throw new IllegalStateException("Unexpected value: " + value - + ", class=" + value.getClass().getSimpleName()); + startKey = null; + endKey = null; + } + for (int i = startIndex; i < deleteRangeIndex; i++) { Review Comment: The outerLoop will have deleteRangeIndices [1, 2, 3] The inner Loop will perform put operation<2, val1> b/w indices [0, 1). After the inner loop outerLoop delete range [1,2) at index 1 [line 430] The inner Loop will perform put operation<2, val1> b/w indices [2, 2) which is nothing. After the inner loop outerLoop delete range [1,3) at index 2 [line 430] So it will perform all op. Yeah I agree what we could have possibly done is checked for continuous indexes of delRange[1, 2) , delRange [1, 3) and we could perform a union of overlapping ranges before performing a put operation to avoid it altogether. Effectively [1,2) + [1,3) will be effectively [1,3) I didn't want a lot of complexity in this logic. But if you feel we should do it I can make that change. ########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBBatchOperation.java: ########## @@ -141,39 +339,99 @@ private class FamilyCache { * When dbValue is a byte[]/{@link ByteBuffer}, it represents a put-op. * Otherwise, it represents a delete-op (dbValue is {@link Op#DELETE}). */ - private final Map<Bytes, Object> ops = new HashMap<>(); + private final Map<Bytes, Integer> opsKeys = new HashMap<>(); + private final Map<Integer, Operation> batchOps = new HashMap<>(); private boolean isCommit; private long batchSize; private long discardedSize; private int discardedCount; private int putCount; private int delCount; + private int delRangeCount; + private AtomicInteger opIndex; FamilyCache(ColumnFamily family) { this.family = family; + this.opIndex = new AtomicInteger(0); } - /** Prepare batch write for the entire family. */ + /** + * Prepares a batch write operation for a RocksDB-backed system. + * + * This method ensures the orderly execution of operations accumulated in the batch, + * respecting their respective types and order of insertion. + * + * Key functionalities: + * 1. Ensures that the batch is not already committed before proceeding. + * 2. Sorts all operations by their `opIndex` to maintain a consistent execution order. + * 3. Filters and adapts operations to account for any delete range operations that might + * affect other operations in the batch: + * - Operations with keys that fall within the range specified by a delete range operation + * are discarded. + * - Delete range operations are executed in their correct order. + * 4. Applies remaining operations to the write batch, ensuring proper filtering and execution. + * 5. Logs a summary of the batch execution for debugging purposes. + * + * Throws: + * - RocksDatabaseException if any error occurs while applying operations to the write batch. + * + * Prerequisites: + * - The method assumes that the operations are represented by `Operation` objects, each of which + * encapsulates the logic for its specific type. + * - Delete range operations must be represented by the `DeleteRangeOperation` class. + */ void prepareBatchWrite() throws RocksDatabaseException { Preconditions.checkState(!isCommit, "%s is already committed.", this); isCommit = true; - for (Map.Entry<Bytes, Object> op : ops.entrySet()) { - final Bytes key = op.getKey(); - final Object value = op.getValue(); - if (value instanceof byte[]) { - family.batchPut(writeBatch, key.array(), (byte[]) value); - } else if (value instanceof CodecBuffer) { - family.batchPut(writeBatch, key.asReadOnlyByteBuffer(), - ((CodecBuffer) value).asReadOnlyByteBuffer()); - } else if (value == Op.DELETE) { - family.batchDelete(writeBatch, key.array()); + // Sort Entries based on opIndex and flush the operation to the batch in the same order. + List<Operation> ops = batchOps.entrySet().stream().sorted(Comparator.comparingInt(Map.Entry::getKey)) + .map(Map.Entry::getValue).collect(Collectors.toList()); + List<Integer> deleteRangeIndices = new ArrayList<>(); + int index = 0; + for (Operation op : ops) { + if (Op.DELETE_RANGE == op.getOpType()) { + deleteRangeIndices.add(index); + } + index++; + } + // This is to apply the last batch of entries after the last DeleteRangeOperation. + deleteRangeIndices.add(ops.size()); + int startIndex = 0; + + for (Integer deleteRangeIndex : deleteRangeIndices) { + DeleteRangeOperation deleteRangeOp = deleteRangeIndex < ops.size() ? + (DeleteRangeOperation) ops.get(deleteRangeIndex) : null; + Bytes startKey, endKey; + if (deleteRangeOp != null) { + startKey = new Bytes(deleteRangeOp.startKey); + endKey = new Bytes(deleteRangeOp.endKey); } else { - throw new IllegalStateException("Unexpected value: " + value - + ", class=" + value.getClass().getSimpleName()); + startKey = null; + endKey = null; + } + for (int i = startIndex; i < deleteRangeIndex; i++) { Review Comment: @SaketaChalamchala The outerLoop will have deleteRangeIndices [1, 2, 3] The inner Loop will perform put operation<2, val1> b/w indices [0, 1). After the inner loop outerLoop delete range [1,2) at index 1 [line 430] The inner Loop will perform put operation<2, val1> b/w indices [2, 2) which is nothing. After the inner loop outerLoop delete range [1,3) at index 2 [line 430] So it will perform all op. Yeah I agree what we could have possibly done is checked for continuous indexes of delRange[1, 2) , delRange [1, 3) and we could perform a union of overlapping ranges before performing a put operation to avoid it altogether. Effectively [1,2) + [1,3) will be effectively [1,3) I didn't want a lot of complexity in this logic. But if you feel we should do it I can make that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org