sumitagrawl commented on code in PR #8203:
URL: https://github.com/apache/ozone/pull/8203#discussion_r2077056382


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java:
##########
@@ -33,14 +34,10 @@ class RDBStoreByteArrayIterator extends 
RDBStoreAbstractIterator<byte[]> {
   }
 
   @Override
-  byte[] key() {
-    return getRocksDBIterator().get().key();
-  }
-
-  @Override
-  Table.KeyValue<byte[], byte[]> getKeyValue() {
+  ReferenceCountedObject<RawKeyValue<byte[]>> getKeyValue() {

Review Comment:
   Instead of having Wrapped ReferenceCountedObject, we can return Closable 
KeyValue Pair like below,
   
   with this, we need to change interface for all cases and its implementation, 
only impacted will be RDBStoreCodecBufferIterator and its caller.
   
   ```
     public static final class ManagedTypedKeyValue implements 
Table.KeyValue<CodecBuffer, CodecBuffer>,
         AutoCloseable {
       private final UncheckedAutoCloseable leakTracker = track(this);
       private final CodecBuffer key;
       private final CodecBuffer value;
       private final BiFunction<CodecBuffer, CodecBuffer, Void> releaser;
   
       public ManagedTypedKeyValue(CodecBuffer key, CodecBuffer value, 
BiFunction<CodecBuffer, CodecBuffer, Void> releaser) {
        this.key = key;
        this.value = value;
        this.releaser = releaser;
       }
   
       @Override
       public CodecBuffer getKey() throws IOException {
         return key;
       }
   
       @Override
       public CodecBuffer getValue() throws IOException {
         return value;
       }
   
       @Override
       public int getRawSize()  throws IOException {
         return value.readableBytes();
       }
   
       @Override
       public void close() throws IOException {
         try {
           releaser.apply(key, value);
         } finally {
           leakTracker.close();
         }
       }
     }
   ```
   



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java:
##########
@@ -78,32 +84,45 @@ final RAW getPrefix() {
 
   @Override
   public final void forEachRemaining(
-      Consumer<? super Table.KeyValue<RAW, RAW>> action) {
+      Consumer<? super UncheckedAutoCloseableSupplier<RawKeyValue<RAW>>> 
action) {
     while (hasNext()) {
-      action.accept(next());
+      UncheckedAutoCloseableSupplier<RawKeyValue<RAW>> entry = next();
+      action.accept(entry);
     }
   }
 
   private void setCurrentEntry() {
-    if (rocksDBIterator.get().isValid()) {
+    if (currentEntry != null) {
+      currentEntry.release();
+    }
+
+    boolean isValid = !closed && rocksDBIterator.get().isValid();
+    if (isValid) {
       currentEntry = getKeyValue();
+      currentEntry.retain();
     } else {
       currentEntry = null;
     }
+    setHasNext(isValid, currentEntry);

Review Comment:
   IMO, this change may not be required, as this is not called in next() at end 
to point to next value. This can be deserialized in hasNext() also at point of 
use as earlier. Not finding any additional advantage of doing here.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java:
##########
@@ -129,19 +148,23 @@ public final void seekToLast() {
   }
 
   @Override
-  public final Table.KeyValue<RAW, RAW> seek(RAW key) {
+  public final UncheckedAutoCloseableSupplier<RawKeyValue<RAW>> seek(RAW key) {
     seek0(key);
     setCurrentEntry();
-    return currentEntry;
+    // Current entry should be only closed when the next() and thus closing 
the returned entry should be a noop.
+    if (hasNext()) {
+      return currentEntry.retainAndReleaseOnClose();
+    }
+    return null;
   }
 
   @Override
   public final void removeFromDB() throws IOException {
     if (rocksDBTable == null) {
       throw new UnsupportedOperationException("remove");
     }
-    if (currentEntry != null) {
-      delete(currentEntry.getKey());
+    if (previousKeyValue != null) {

Review Comment:
   In parallel iterator, this method is error prone. We can not support this 
feature for this. May be input can be added for key or Key-Value pair.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawSpliterator.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.IOException;
+import java.util.Spliterator;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import org.apache.ratis.util.ReferenceCountedObject;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+
+/**
+ * A {@link Table.KeyValueIterator} backed by a raw iterator.
+ *
+ * @param <RAW> The raw type.
+ */
+abstract class RawSpliterator<RAW, KEY, VALUE> implements 
Table.KeyValueSpliterator<KEY, VALUE> {
+
+  private final ReferenceCountedObject<TableIterator<RAW,
+      UncheckedAutoCloseableSupplier<RawKeyValue<RAW>>>> rawIterator;
+  private final AtomicInteger maxNumberOfAdditionalSplits;
+  private final Lock lock;
+  private final AtomicReference<IOException> closeException = new 
AtomicReference<>();
+
+  abstract Table.KeyValue<KEY, VALUE> convert(RawKeyValue<RAW> kv) throws 
IOException;
+
+  abstract TableIterator<RAW, 
UncheckedAutoCloseableSupplier<RawKeyValue<RAW>>> getRawIterator(
+      KEY prefix, KEY startKey, int maxParallelism) throws IOException;
+
+  RawSpliterator(KEY prefix, KEY startKey, int maxParallelism) throws 
IOException {
+    TableIterator<RAW, UncheckedAutoCloseableSupplier<RawKeyValue<RAW>>> itr = 
getRawIterator(prefix,
+        startKey, maxParallelism);
+    this.lock = new ReentrantLock();
+    this.maxNumberOfAdditionalSplits = new AtomicInteger(maxParallelism - 1);

Review Comment:
   Do we really need reference counted object for base iterator? I think not 
required, and close should be success, and any excess should throw the 
exception when usages is completed.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java:
##########
@@ -33,14 +34,10 @@ class RDBStoreByteArrayIterator extends 
RDBStoreAbstractIterator<byte[]> {
   }
 
   @Override
-  byte[] key() {
-    return getRocksDBIterator().get().key();
-  }
-
-  @Override
-  Table.KeyValue<byte[], byte[]> getKeyValue() {
+  ReferenceCountedObject<RawKeyValue<byte[]>> getKeyValue() {

Review Comment:
   Refernce counting can be provided as atomic counter as part of this itself.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java:
##########
@@ -218,14 +219,63 @@ public TableIterator<byte[], KeyValue<byte[], byte[]>> 
iterator()
   @Override
   public TableIterator<byte[], KeyValue<byte[], byte[]>> iterator(byte[] 
prefix)
       throws IOException {
-    return new RDBStoreByteArrayIterator(db.newIterator(family, false), this,
-        prefix);
+    RDBStoreByteArrayIterator itr  = new 
RDBStoreByteArrayIterator(db.newIterator(family, false), this, prefix);
+    return new TableIterator<byte[], KeyValue<byte[], byte[]>>() {
+      @Override
+      public void seekToFirst() {
+        itr.seekToFirst();
+      }
+
+      @Override
+      public void seekToLast() {
+        itr.seekToLast();
+      }
+
+      @Override
+      public KeyValue<byte[], byte[]> seek(byte[] bytes) {
+        try (UncheckedAutoCloseableSupplier<RawKeyValue<byte[]>> kv = 
itr.seek(bytes)) {
+          return kv == null ? null : kv.get();
+        }
+      }
+
+      @Override
+      public void removeFromDB() throws IOException {
+        itr.removeFromDB();
+      }
+
+      @Override
+      public void close() {
+        itr.close();
+      }
+
+      @Override
+      public boolean hasNext() {
+        return itr.hasNext();
+      }
+
+      @Override
+      public KeyValue<byte[], byte[]> next() {
+        try (UncheckedAutoCloseableSupplier<RawKeyValue<byte[]>> kv = 
itr.next()) {
+          return kv.get();
+        }
+      }
+    };
   }
 
-  TableIterator<CodecBuffer, KeyValue<CodecBuffer, CodecBuffer>> iterator(
+  public TableIterator<byte[], 
UncheckedAutoCloseableSupplier<RawKeyValue<byte[]>>> closeableSupplierIterator(
+      byte[] prefix) throws IOException {
+    return new RDBStoreByteArrayIterator(db.newIterator(family, false), this, 
prefix);
+  }
+
+  TableIterator<CodecBuffer, 
UncheckedAutoCloseableSupplier<RawKeyValue<CodecBuffer>>> iterator(
       CodecBuffer prefix) throws IOException {
+    return iterator(prefix, 2);
+  }
+
+  TableIterator<CodecBuffer, 
UncheckedAutoCloseableSupplier<RawKeyValue<CodecBuffer>>> iterator(

Review Comment:
   We can remove this iterator creation here and be part of specialized method, 
as this needs to be wrapped for handling Buffer release. No need expose.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java:
##########
@@ -30,30 +32,34 @@
  * @param <RAW> the raw type.
  */
 abstract class RDBStoreAbstractIterator<RAW>
-    implements TableIterator<RAW, Table.KeyValue<RAW, RAW>> {
+    implements TableIterator<RAW, 
UncheckedAutoCloseableSupplier<RawKeyValue<RAW>>> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(RDBStoreAbstractIterator.class);
 
   private final ManagedRocksIterator rocksDBIterator;
   private final RDBTable rocksDBTable;
-  private Table.KeyValue<RAW, RAW> currentEntry;
+  private ReferenceCountedObject<RawKeyValue<RAW>> currentEntry;
+  private RawKeyValue<RAW> previousKeyValue;
   // This is for schemas that use a fixed-length
   // prefix for each key.
   private final RAW prefix;
+  private boolean hasNext;
+  private boolean closed;
 
   RDBStoreAbstractIterator(ManagedRocksIterator iterator, RDBTable table,
       RAW prefix) {
     this.rocksDBIterator = iterator;
     this.rocksDBTable = table;
     this.prefix = prefix;
+    this.currentEntry = null;
+    this.hasNext = false;
+    this.closed = false;
+    this.previousKeyValue = null;

Review Comment:
   We should have currentEntry and nextEntry as per change in logic. its 
confusing that currentEntry refers nextEntry as done in 
next()->setCurrentEntry(). As per change in method behavior, naming also should 
change.
   
   getKey() and getValue() together is fine, no need have seprate getKey()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org
For additional commands, e-mail: issues-h...@ozone.apache.org

Reply via email to