sumitagrawl commented on code in PR #8203: URL: https://github.com/apache/ozone/pull/8203#discussion_r2077056382
########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java: ########## @@ -33,14 +34,10 @@ class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator<byte[]> { } @Override - byte[] key() { - return getRocksDBIterator().get().key(); - } - - @Override - Table.KeyValue<byte[], byte[]> getKeyValue() { + ReferenceCountedObject<RawKeyValue<byte[]>> getKeyValue() { Review Comment: Instead of having Wrapped ReferenceCountedObject, we can return Closable KeyValue Pair like below, with this, we need to change interface for all cases and its implementation, only impacted will be RDBStoreCodecBufferIterator and its caller. ``` public static final class ManagedTypedKeyValue implements Table.KeyValue<CodecBuffer, CodecBuffer>, AutoCloseable { private final UncheckedAutoCloseable leakTracker = track(this); private final CodecBuffer key; private final CodecBuffer value; private final BiFunction<CodecBuffer, CodecBuffer, Void> releaser; public ManagedTypedKeyValue(CodecBuffer key, CodecBuffer value, BiFunction<CodecBuffer, CodecBuffer, Void> releaser) { this.key = key; this.value = value; this.releaser = releaser; } @Override public CodecBuffer getKey() throws IOException { return key; } @Override public CodecBuffer getValue() throws IOException { return value; } @Override public int getRawSize() throws IOException { return value.readableBytes(); } @Override public void close() throws IOException { try { releaser.apply(key, value); } finally { leakTracker.close(); } } } ``` ########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java: ########## @@ -78,32 +84,45 @@ final RAW getPrefix() { @Override public final void forEachRemaining( - Consumer<? super Table.KeyValue<RAW, RAW>> action) { + Consumer<? super UncheckedAutoCloseableSupplier<RawKeyValue<RAW>>> action) { while (hasNext()) { - action.accept(next()); + UncheckedAutoCloseableSupplier<RawKeyValue<RAW>> entry = next(); + action.accept(entry); } } private void setCurrentEntry() { - if (rocksDBIterator.get().isValid()) { + if (currentEntry != null) { + currentEntry.release(); + } + + boolean isValid = !closed && rocksDBIterator.get().isValid(); + if (isValid) { currentEntry = getKeyValue(); + currentEntry.retain(); } else { currentEntry = null; } + setHasNext(isValid, currentEntry); Review Comment: IMO, this change may not be required, as this is not called in next() at end to point to next value. This can be deserialized in hasNext() also at point of use as earlier. Not finding any additional advantage of doing here. ########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java: ########## @@ -129,19 +148,23 @@ public final void seekToLast() { } @Override - public final Table.KeyValue<RAW, RAW> seek(RAW key) { + public final UncheckedAutoCloseableSupplier<RawKeyValue<RAW>> seek(RAW key) { seek0(key); setCurrentEntry(); - return currentEntry; + // Current entry should be only closed when the next() and thus closing the returned entry should be a noop. + if (hasNext()) { + return currentEntry.retainAndReleaseOnClose(); + } + return null; } @Override public final void removeFromDB() throws IOException { if (rocksDBTable == null) { throw new UnsupportedOperationException("remove"); } - if (currentEntry != null) { - delete(currentEntry.getKey()); + if (previousKeyValue != null) { Review Comment: In parallel iterator, this method is error prone. We can not support this feature for this. May be input can be added for key or Key-Value pair. ########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RawSpliterator.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.io.IOException; +import java.util.Spliterator; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import org.apache.ratis.util.ReferenceCountedObject; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; + +/** + * A {@link Table.KeyValueIterator} backed by a raw iterator. + * + * @param <RAW> The raw type. + */ +abstract class RawSpliterator<RAW, KEY, VALUE> implements Table.KeyValueSpliterator<KEY, VALUE> { + + private final ReferenceCountedObject<TableIterator<RAW, + UncheckedAutoCloseableSupplier<RawKeyValue<RAW>>>> rawIterator; + private final AtomicInteger maxNumberOfAdditionalSplits; + private final Lock lock; + private final AtomicReference<IOException> closeException = new AtomicReference<>(); + + abstract Table.KeyValue<KEY, VALUE> convert(RawKeyValue<RAW> kv) throws IOException; + + abstract TableIterator<RAW, UncheckedAutoCloseableSupplier<RawKeyValue<RAW>>> getRawIterator( + KEY prefix, KEY startKey, int maxParallelism) throws IOException; + + RawSpliterator(KEY prefix, KEY startKey, int maxParallelism) throws IOException { + TableIterator<RAW, UncheckedAutoCloseableSupplier<RawKeyValue<RAW>>> itr = getRawIterator(prefix, + startKey, maxParallelism); + this.lock = new ReentrantLock(); + this.maxNumberOfAdditionalSplits = new AtomicInteger(maxParallelism - 1); Review Comment: Do we really need reference counted object for base iterator? I think not required, and close should be success, and any excess should throw the exception when usages is completed. ########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreByteArrayIterator.java: ########## @@ -33,14 +34,10 @@ class RDBStoreByteArrayIterator extends RDBStoreAbstractIterator<byte[]> { } @Override - byte[] key() { - return getRocksDBIterator().get().key(); - } - - @Override - Table.KeyValue<byte[], byte[]> getKeyValue() { + ReferenceCountedObject<RawKeyValue<byte[]>> getKeyValue() { Review Comment: Refernce counting can be provided as atomic counter as part of this itself. ########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java: ########## @@ -218,14 +219,63 @@ public TableIterator<byte[], KeyValue<byte[], byte[]>> iterator() @Override public TableIterator<byte[], KeyValue<byte[], byte[]>> iterator(byte[] prefix) throws IOException { - return new RDBStoreByteArrayIterator(db.newIterator(family, false), this, - prefix); + RDBStoreByteArrayIterator itr = new RDBStoreByteArrayIterator(db.newIterator(family, false), this, prefix); + return new TableIterator<byte[], KeyValue<byte[], byte[]>>() { + @Override + public void seekToFirst() { + itr.seekToFirst(); + } + + @Override + public void seekToLast() { + itr.seekToLast(); + } + + @Override + public KeyValue<byte[], byte[]> seek(byte[] bytes) { + try (UncheckedAutoCloseableSupplier<RawKeyValue<byte[]>> kv = itr.seek(bytes)) { + return kv == null ? null : kv.get(); + } + } + + @Override + public void removeFromDB() throws IOException { + itr.removeFromDB(); + } + + @Override + public void close() { + itr.close(); + } + + @Override + public boolean hasNext() { + return itr.hasNext(); + } + + @Override + public KeyValue<byte[], byte[]> next() { + try (UncheckedAutoCloseableSupplier<RawKeyValue<byte[]>> kv = itr.next()) { + return kv.get(); + } + } + }; } - TableIterator<CodecBuffer, KeyValue<CodecBuffer, CodecBuffer>> iterator( + public TableIterator<byte[], UncheckedAutoCloseableSupplier<RawKeyValue<byte[]>>> closeableSupplierIterator( + byte[] prefix) throws IOException { + return new RDBStoreByteArrayIterator(db.newIterator(family, false), this, prefix); + } + + TableIterator<CodecBuffer, UncheckedAutoCloseableSupplier<RawKeyValue<CodecBuffer>>> iterator( CodecBuffer prefix) throws IOException { + return iterator(prefix, 2); + } + + TableIterator<CodecBuffer, UncheckedAutoCloseableSupplier<RawKeyValue<CodecBuffer>>> iterator( Review Comment: We can remove this iterator creation here and be part of specialized method, as this needs to be wrapped for handling Buffer release. No need expose. ########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreAbstractIterator.java: ########## @@ -30,30 +32,34 @@ * @param <RAW> the raw type. */ abstract class RDBStoreAbstractIterator<RAW> - implements TableIterator<RAW, Table.KeyValue<RAW, RAW>> { + implements TableIterator<RAW, UncheckedAutoCloseableSupplier<RawKeyValue<RAW>>> { private static final Logger LOG = LoggerFactory.getLogger(RDBStoreAbstractIterator.class); private final ManagedRocksIterator rocksDBIterator; private final RDBTable rocksDBTable; - private Table.KeyValue<RAW, RAW> currentEntry; + private ReferenceCountedObject<RawKeyValue<RAW>> currentEntry; + private RawKeyValue<RAW> previousKeyValue; // This is for schemas that use a fixed-length // prefix for each key. private final RAW prefix; + private boolean hasNext; + private boolean closed; RDBStoreAbstractIterator(ManagedRocksIterator iterator, RDBTable table, RAW prefix) { this.rocksDBIterator = iterator; this.rocksDBTable = table; this.prefix = prefix; + this.currentEntry = null; + this.hasNext = false; + this.closed = false; + this.previousKeyValue = null; Review Comment: We should have currentEntry and nextEntry as per change in logic. its confusing that currentEntry refers nextEntry as done in next()->setCurrentEntry(). As per change in method behavior, naming also should change. getKey() and getValue() together is fine, no need have seprate getKey() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org