devmadhuu commented on code in PR #8091: URL: https://github.com/apache/ozone/pull/8091#discussion_r1997482966
########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks.util; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.rocksdb.LiveFileMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to iterate through a table in parallel by breaking table into multiple iterators. + */ +public class ParallelTableIteratorOperation<K extends Comparable<K>, V> implements Closeable { + private final Table<K, V> table; + private final Codec<K> keyCodec; + private final ExecutorService iteratorExecutor; + private final ExecutorService valueExecutors; + private final int maxNumberOfVals; + private final OMMetadataManager metadataManager; + private final int maxIteratorTasks; + private final int maxWorkerTasks; + private final long logCountThreshold; + + private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class); + public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table<K, V> table, Codec<K> keyCodec, + int iteratorCount, int workerCount, int maxNumberOfValsInMemory, + long logThreshold) { + this.table = table; + this.keyCodec = keyCodec; + this.metadataManager = metadataManager; + this.maxIteratorTasks = 2 * iteratorCount; + this.maxWorkerTasks = workerCount * 2; + this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(iteratorCount * 2)); + this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(workerCount + 1)); + this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount)); + this.logCountThreshold = logThreshold; + } + + + private List<K> getBounds(K startKey, K endKey) throws IOException { + RDBStore store = (RDBStore) this.metadataManager.getStore(); + List<LiveFileMetaData> sstFiles = store.getDb().getSstFileList(); + Set<K> keys = new HashSet<>(); + for (LiveFileMetaData sstFile : sstFiles) { + keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey())); + keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey())); + } + if (startKey != null) { + keys.add(startKey); + } + if (endKey != null) { + keys.add(endKey); + } + + return keys.stream().sorted().filter(Objects::nonNull) + .filter(key -> startKey == null || key.compareTo(startKey) >= 0) + .filter(key -> endKey == null || endKey.compareTo(key) >= 0) + .collect(Collectors.toList()); + } + + private void waitForQueueSize(Queue<Future<?>> futures, int expectedSize) + throws ExecutionException, InterruptedException { + while (!futures.isEmpty() && futures.size() > expectedSize) { + Future<?> f = futures.poll(); + f.get(); + } + } + + public void performTaskOnTableVals(String taskName, K startKey, K endKey, + Function<Table.KeyValue<K, V>, Void> keyOperation) throws IOException, ExecutionException, InterruptedException { + List<K> bounds = getBounds(startKey, endKey); + Queue<Future<?>> iterFutures = new LinkedList<>(); + Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>(); + AtomicLong keyCounter = new AtomicLong(); + AtomicLong prevLogCounter = new AtomicLong(); + for (int idx = 1; idx < bounds.size(); idx++) { + K beg = bounds.get(idx - 1); + K end = bounds.get(idx); + boolean inclusive = idx == bounds.size() - 1; + waitForQueueSize(iterFutures, maxIteratorTasks - 1); + iterFutures.add(iteratorExecutor.submit(() -> { + try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter = table.iterator()) { + iter.seek(beg); + while (iter.hasNext()) { + List<Table.KeyValue<K, V>> keyValues = new ArrayList<>(); + boolean reachedEnd = false; + while (iter.hasNext()) { + Table.KeyValue<K, V> kv = iter.next(); + if (kv.getKey().compareTo(end) < 0 || (inclusive && kv.getKey().compareTo(endKey) <= 0)) { + keyValues.add(kv); + } else { + reachedEnd = true; + break; + } + if (keyValues.size() >= maxNumberOfVals) { + break; + } + } + if (!keyValues.isEmpty()) { + waitForQueueSize(workerFutures, maxWorkerTasks - 1); + workerFutures.add(valueExecutors.submit(() -> { + for (Table.KeyValue<K, V> kv : keyValues) { + keyOperation.apply(kv); + } + keyCounter.addAndGet(keyValues.size()); + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + synchronized (keyCounter) { + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + long cnt = keyCounter.get(); + LOG.info("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName); + prevLogCounter.set(cnt); + } + } + } + })); + } + if (reachedEnd) { + break; + } + } + } catch (IOException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); Review Comment: Here, InterruptedException is caught but not handled properly. Instead of just wrapping it in RuntimeException, it should restore the thread's interrupted state. ```suggestion if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); // Restore interrupt status } throw new RuntimeException(e); ``` ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks.util; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.rocksdb.LiveFileMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to iterate through a table in parallel by breaking table into multiple iterators. + */ +public class ParallelTableIteratorOperation<K extends Comparable<K>, V> implements Closeable { + private final Table<K, V> table; + private final Codec<K> keyCodec; + private final ExecutorService iteratorExecutor; + private final ExecutorService valueExecutors; + private final int maxNumberOfVals; + private final OMMetadataManager metadataManager; + private final int maxIteratorTasks; + private final int maxWorkerTasks; + private final long logCountThreshold; + + private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class); + public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table<K, V> table, Codec<K> keyCodec, + int iteratorCount, int workerCount, int maxNumberOfValsInMemory, + long logThreshold) { + this.table = table; + this.keyCodec = keyCodec; + this.metadataManager = metadataManager; + this.maxIteratorTasks = 2 * iteratorCount; + this.maxWorkerTasks = workerCount * 2; + this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(iteratorCount * 2)); + this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(workerCount + 1)); + this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount)); + this.logCountThreshold = logThreshold; + } + + + private List<K> getBounds(K startKey, K endKey) throws IOException { + RDBStore store = (RDBStore) this.metadataManager.getStore(); + List<LiveFileMetaData> sstFiles = store.getDb().getSstFileList(); + Set<K> keys = new HashSet<>(); + for (LiveFileMetaData sstFile : sstFiles) { + keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey())); + keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey())); + } + if (startKey != null) { + keys.add(startKey); + } + if (endKey != null) { + keys.add(endKey); + } + + return keys.stream().sorted().filter(Objects::nonNull) + .filter(key -> startKey == null || key.compareTo(startKey) >= 0) + .filter(key -> endKey == null || endKey.compareTo(key) >= 0) + .collect(Collectors.toList()); + } + + private void waitForQueueSize(Queue<Future<?>> futures, int expectedSize) + throws ExecutionException, InterruptedException { + while (!futures.isEmpty() && futures.size() > expectedSize) { + Future<?> f = futures.poll(); + f.get(); + } + } + + public void performTaskOnTableVals(String taskName, K startKey, K endKey, + Function<Table.KeyValue<K, V>, Void> keyOperation) throws IOException, ExecutionException, InterruptedException { + List<K> bounds = getBounds(startKey, endKey); + Queue<Future<?>> iterFutures = new LinkedList<>(); + Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>(); + AtomicLong keyCounter = new AtomicLong(); + AtomicLong prevLogCounter = new AtomicLong(); + for (int idx = 1; idx < bounds.size(); idx++) { + K beg = bounds.get(idx - 1); + K end = bounds.get(idx); + boolean inclusive = idx == bounds.size() - 1; + waitForQueueSize(iterFutures, maxIteratorTasks - 1); + iterFutures.add(iteratorExecutor.submit(() -> { + try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter = table.iterator()) { + iter.seek(beg); + while (iter.hasNext()) { + List<Table.KeyValue<K, V>> keyValues = new ArrayList<>(); + boolean reachedEnd = false; + while (iter.hasNext()) { + Table.KeyValue<K, V> kv = iter.next(); + if (kv.getKey().compareTo(end) < 0 || (inclusive && kv.getKey().compareTo(endKey) <= 0)) { + keyValues.add(kv); + } else { + reachedEnd = true; + break; + } + if (keyValues.size() >= maxNumberOfVals) { + break; + } + } + if (!keyValues.isEmpty()) { + waitForQueueSize(workerFutures, maxWorkerTasks - 1); + workerFutures.add(valueExecutors.submit(() -> { + for (Table.KeyValue<K, V> kv : keyValues) { + keyOperation.apply(kv); + } + keyCounter.addAndGet(keyValues.size()); + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + synchronized (keyCounter) { + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + long cnt = keyCounter.get(); + LOG.info("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName); + prevLogCounter.set(cnt); + } + } + } + })); + } + if (reachedEnd) { + break; + } + } + } catch (IOException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + })); + } + waitForQueueSize(iterFutures, 0); + waitForQueueSize(workerFutures, 0); + } + + @Override + public void close() throws IOException { Review Comment: shutdown() is used but does not guarantee that all tasks are completed. ```suggestion public void close() throws IOException { iteratorExecutor.shutdown(); valueExecutors.shutdown(); try { if (!iteratorExecutor.awaitTermination(1, TimeUnit.MINUTES)) { iteratorExecutor.shutdownNow(); } if (!valueExecutors.awaitTermination(1, TimeUnit.MINUTES)) { valueExecutors.shutdownNow(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } ``` ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks.util; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.rocksdb.LiveFileMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to iterate through a table in parallel by breaking table into multiple iterators. + */ +public class ParallelTableIteratorOperation<K extends Comparable<K>, V> implements Closeable { + private final Table<K, V> table; + private final Codec<K> keyCodec; + private final ExecutorService iteratorExecutor; + private final ExecutorService valueExecutors; + private final int maxNumberOfVals; + private final OMMetadataManager metadataManager; + private final int maxIteratorTasks; + private final int maxWorkerTasks; + private final long logCountThreshold; + + private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class); + public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table<K, V> table, Codec<K> keyCodec, + int iteratorCount, int workerCount, int maxNumberOfValsInMemory, + long logThreshold) { + this.table = table; + this.keyCodec = keyCodec; + this.metadataManager = metadataManager; + this.maxIteratorTasks = 2 * iteratorCount; + this.maxWorkerTasks = workerCount * 2; + this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(iteratorCount * 2)); + this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(workerCount + 1)); + this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount)); + this.logCountThreshold = logThreshold; + } + + + private List<K> getBounds(K startKey, K endKey) throws IOException { + RDBStore store = (RDBStore) this.metadataManager.getStore(); + List<LiveFileMetaData> sstFiles = store.getDb().getSstFileList(); + Set<K> keys = new HashSet<>(); + for (LiveFileMetaData sstFile : sstFiles) { + keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey())); + keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey())); + } + if (startKey != null) { + keys.add(startKey); + } + if (endKey != null) { + keys.add(endKey); + } + + return keys.stream().sorted().filter(Objects::nonNull) + .filter(key -> startKey == null || key.compareTo(startKey) >= 0) + .filter(key -> endKey == null || endKey.compareTo(key) >= 0) + .collect(Collectors.toList()); + } + + private void waitForQueueSize(Queue<Future<?>> futures, int expectedSize) + throws ExecutionException, InterruptedException { + while (!futures.isEmpty() && futures.size() > expectedSize) { + Future<?> f = futures.poll(); + f.get(); Review Comment: Currently, `waitForQueueSize` ensures that too many futures aren’t queued up, but method blocks the main thread, reducing parallelism, so instead of blocking with `f.get()`, collect `CompletableFutures` and use `allOf()`. May be something like below: ```suggestion CompletableFuture.allOf(workerFutures.toArray(new CompletableFuture[0])).join(); ``` This will wait for all worker tasks to complete without blocking individual threads. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithOBS.java: ########## @@ -49,66 +52,88 @@ public class NSSummaryTaskWithOBS extends NSSummaryTaskDbEventHandler { LoggerFactory.getLogger(NSSummaryTaskWithOBS.class); private final long nsSummaryFlushToDBMaxThreshold; - + private final int maxKeysInMemory; + private final int maxIterators; + private final int maxWorkers; public NSSummaryTaskWithOBS( ReconNamespaceSummaryManager reconNamespaceSummaryManager, ReconOMMetadataManager reconOMMetadataManager, - long nsSummaryFlushToDBMaxThreshold) { + long nsSummaryFlushToDBMaxThreshold, int maxIterators, int maxWorkers, int maxKeysInMemory) { super(reconNamespaceSummaryManager, reconOMMetadataManager); this.nsSummaryFlushToDBMaxThreshold = nsSummaryFlushToDBMaxThreshold; + this.maxIterators = maxIterators; + this.maxWorkers = maxWorkers; + this.maxKeysInMemory = maxKeysInMemory; } public boolean reprocessWithOBS(OMMetadataManager omMetadataManager) { Map<Long, NSSummary> nsSummaryMap = new HashMap<>(); - + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); Review Comment: Same comment if we can void explicit locks if you can change the `nsSummaryMap` here as well to `ConcurrentHashMap` ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskDbEventHandler.java: ########## @@ -79,24 +79,27 @@ protected void handlePutKeyEvent(OmKeyInfo keyInfo, Map<Long, NSSummary> nsSummaryMap) throws IOException { long parentObjectId = keyInfo.getParentObjectID(); // Try to get the NSSummary from our local map that maps NSSummaries to IDs - NSSummary nsSummary = nsSummaryMap.get(parentObjectId); - if (nsSummary == null) { - // If we don't have it in this batch we try to get it from the DB - nsSummary = reconNamespaceSummaryManager.getNSSummary(parentObjectId); - } - if (nsSummary == null) { - // If we don't have it locally and in the DB we create a new instance - // as this is a new ID - nsSummary = new NSSummary(); - } - int[] fileBucket = nsSummary.getFileSizeBucket(); - nsSummary.setNumOfFiles(nsSummary.getNumOfFiles() + 1); - nsSummary.setSizeOfFiles(nsSummary.getSizeOfFiles() + keyInfo.getDataSize()); - int binIndex = ReconUtils.getFileSizeBinIndex(keyInfo.getDataSize()); - - ++fileBucket[binIndex]; - nsSummary.setFileSizeBucket(fileBucket); - nsSummaryMap.put(parentObjectId, nsSummary); + nsSummaryMap.compute(parentObjectId, (k, v) -> { + if (v == null) { + try { + v = reconNamespaceSummaryManager.getNSSummary(parentObjectId); + if (v == null) { + v = new NSSummary(); + } + } catch (IOException e) { + throw new RuntimeException(e); Review Comment: Can we log the error here ? and check in case of thread crashed, they will not crash silently. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java: ########## @@ -104,22 +123,36 @@ public TaskResult reprocess(OMMetadataManager omMetadataManager) { // configured batch threshold. // containerKeyCountMap can be flushed at the end since the number // of containers in a cluster will not have significant memory overhead. - Table<String, OmKeyInfo> omKeyInfoTable = - omMetadataManager.getKeyTable(layout); - try ( - TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> - keyIter = omKeyInfoTable.iterator()) { - while (keyIter.hasNext()) { - Table.KeyValue<String, OmKeyInfo> kv = keyIter.next(); - OmKeyInfo omKeyInfo = kv.getValue(); - handleKeyReprocess(kv.getKey(), omKeyInfo, containerKeyMap, - containerKeyCountMap); - if (!checkAndCallFlushToDB(containerKeyMap)) { - LOG.error("Unable to flush containerKey information to the DB"); - return buildTaskResult(false); + Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable(layout); + Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> { + try { + try { + lock.readLock().lock(); + handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap, + containerKeyCountMap); + } finally { + lock.readLock().unlock(); } - omKeyCount++; + omKeyCount.incrementAndGet(); + if (containerKeyMap.size() >= containerKeyFlushToDBMaxThreshold) { + try { + lock.writeLock().lock(); + if (!checkAndCallFlushToDB(containerKeyMap)) { + throw new UncheckedIOException(new IOException("Unable to flush containerKey information to the DB")); + } + } finally { + lock.writeLock().unlock(); + } + } + return null; + } catch (IOException e) { + throw new UncheckedIOException(e); Review Comment: Do we need to wrap here ? IMO, we should log and rethrow to preserve context. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java: ########## @@ -81,21 +100,21 @@ public ContainerKeyMapperTask(ReconContainerMetadataManager */ @Override public TaskResult reprocess(OMMetadataManager omMetadataManager) { - long omKeyCount = 0; + AtomicLong omKeyCount = new AtomicLong(0); // In-memory maps for fast look up and batch write // (container, key) -> count - Map<ContainerKeyPrefix, Integer> containerKeyMap = new HashMap<>(); + Map<ContainerKeyPrefix, Integer> containerKeyMap = new ConcurrentHashMap<>(); // containerId -> key count - Map<Long, Long> containerKeyCountMap = new HashMap<>(); + Map<Long, Long> containerKeyCountMap = new ConcurrentHashMap<>(); try { LOG.debug("Starting a 'reprocess' run of ContainerKeyMapperTask."); Instant start = Instant.now(); - // initialize new container DB reconContainerMetadataManager .reinitWithNewContainerDataFromOm(new HashMap<>()); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); Review Comment: `StampedLock` could be a better alternative in high loads which supports optimistic reads. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks.util; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.rocksdb.LiveFileMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to iterate through a table in parallel by breaking table into multiple iterators. + */ +public class ParallelTableIteratorOperation<K extends Comparable<K>, V> implements Closeable { + private final Table<K, V> table; + private final Codec<K> keyCodec; + private final ExecutorService iteratorExecutor; + private final ExecutorService valueExecutors; + private final int maxNumberOfVals; + private final OMMetadataManager metadataManager; + private final int maxIteratorTasks; + private final int maxWorkerTasks; + private final long logCountThreshold; + + private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class); + public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table<K, V> table, Codec<K> keyCodec, + int iteratorCount, int workerCount, int maxNumberOfValsInMemory, + long logThreshold) { + this.table = table; + this.keyCodec = keyCodec; + this.metadataManager = metadataManager; + this.maxIteratorTasks = 2 * iteratorCount; + this.maxWorkerTasks = workerCount * 2; + this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(iteratorCount * 2)); + this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(workerCount + 1)); + this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount)); + this.logCountThreshold = logThreshold; + } + + + private List<K> getBounds(K startKey, K endKey) throws IOException { + RDBStore store = (RDBStore) this.metadataManager.getStore(); + List<LiveFileMetaData> sstFiles = store.getDb().getSstFileList(); + Set<K> keys = new HashSet<>(); + for (LiveFileMetaData sstFile : sstFiles) { + keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey())); + keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey())); + } + if (startKey != null) { + keys.add(startKey); + } + if (endKey != null) { + keys.add(endKey); + } + + return keys.stream().sorted().filter(Objects::nonNull) + .filter(key -> startKey == null || key.compareTo(startKey) >= 0) + .filter(key -> endKey == null || endKey.compareTo(key) >= 0) + .collect(Collectors.toList()); + } + + private void waitForQueueSize(Queue<Future<?>> futures, int expectedSize) + throws ExecutionException, InterruptedException { + while (!futures.isEmpty() && futures.size() > expectedSize) { + Future<?> f = futures.poll(); + f.get(); + } + } + + public void performTaskOnTableVals(String taskName, K startKey, K endKey, + Function<Table.KeyValue<K, V>, Void> keyOperation) throws IOException, ExecutionException, InterruptedException { + List<K> bounds = getBounds(startKey, endKey); + Queue<Future<?>> iterFutures = new LinkedList<>(); + Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>(); + AtomicLong keyCounter = new AtomicLong(); + AtomicLong prevLogCounter = new AtomicLong(); + for (int idx = 1; idx < bounds.size(); idx++) { + K beg = bounds.get(idx - 1); + K end = bounds.get(idx); + boolean inclusive = idx == bounds.size() - 1; + waitForQueueSize(iterFutures, maxIteratorTasks - 1); + iterFutures.add(iteratorExecutor.submit(() -> { + try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter = table.iterator()) { + iter.seek(beg); + while (iter.hasNext()) { + List<Table.KeyValue<K, V>> keyValues = new ArrayList<>(); + boolean reachedEnd = false; + while (iter.hasNext()) { + Table.KeyValue<K, V> kv = iter.next(); + if (kv.getKey().compareTo(end) < 0 || (inclusive && kv.getKey().compareTo(endKey) <= 0)) { + keyValues.add(kv); + } else { + reachedEnd = true; + break; + } + if (keyValues.size() >= maxNumberOfVals) { + break; + } + } + if (!keyValues.isEmpty()) { + waitForQueueSize(workerFutures, maxWorkerTasks - 1); + workerFutures.add(valueExecutors.submit(() -> { + for (Table.KeyValue<K, V> kv : keyValues) { + keyOperation.apply(kv); + } + keyCounter.addAndGet(keyValues.size()); + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + synchronized (keyCounter) { + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + long cnt = keyCounter.get(); + LOG.info("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName); + prevLogCounter.set(cnt); + } + } + } + })); + } + if (reachedEnd) { + break; + } + } + } catch (IOException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); Review Comment: Can we also log the error ? ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks.util; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.rocksdb.LiveFileMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to iterate through a table in parallel by breaking table into multiple iterators. + */ +public class ParallelTableIteratorOperation<K extends Comparable<K>, V> implements Closeable { + private final Table<K, V> table; + private final Codec<K> keyCodec; + private final ExecutorService iteratorExecutor; + private final ExecutorService valueExecutors; + private final int maxNumberOfVals; + private final OMMetadataManager metadataManager; + private final int maxIteratorTasks; + private final int maxWorkerTasks; + private final long logCountThreshold; + + private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class); + public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table<K, V> table, Codec<K> keyCodec, + int iteratorCount, int workerCount, int maxNumberOfValsInMemory, + long logThreshold) { + this.table = table; + this.keyCodec = keyCodec; + this.metadataManager = metadataManager; + this.maxIteratorTasks = 2 * iteratorCount; + this.maxWorkerTasks = workerCount * 2; + this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(iteratorCount * 2)); + this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(workerCount + 1)); + this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount)); + this.logCountThreshold = logThreshold; + } + + + private List<K> getBounds(K startKey, K endKey) throws IOException { + RDBStore store = (RDBStore) this.metadataManager.getStore(); + List<LiveFileMetaData> sstFiles = store.getDb().getSstFileList(); + Set<K> keys = new HashSet<>(); + for (LiveFileMetaData sstFile : sstFiles) { + keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey())); + keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey())); + } + if (startKey != null) { + keys.add(startKey); + } + if (endKey != null) { + keys.add(endKey); + } + + return keys.stream().sorted().filter(Objects::nonNull) + .filter(key -> startKey == null || key.compareTo(startKey) >= 0) + .filter(key -> endKey == null || endKey.compareTo(key) >= 0) + .collect(Collectors.toList()); + } + + private void waitForQueueSize(Queue<Future<?>> futures, int expectedSize) + throws ExecutionException, InterruptedException { + while (!futures.isEmpty() && futures.size() > expectedSize) { + Future<?> f = futures.poll(); Review Comment: can we implement timeouts in `waitForQueueSize` to prevent waiting indefinitely ? ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks.util; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.rocksdb.LiveFileMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to iterate through a table in parallel by breaking table into multiple iterators. + */ +public class ParallelTableIteratorOperation<K extends Comparable<K>, V> implements Closeable { + private final Table<K, V> table; + private final Codec<K> keyCodec; + private final ExecutorService iteratorExecutor; + private final ExecutorService valueExecutors; + private final int maxNumberOfVals; + private final OMMetadataManager metadataManager; + private final int maxIteratorTasks; + private final int maxWorkerTasks; + private final long logCountThreshold; + + private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class); + public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table<K, V> table, Codec<K> keyCodec, + int iteratorCount, int workerCount, int maxNumberOfValsInMemory, + long logThreshold) { + this.table = table; + this.keyCodec = keyCodec; + this.metadataManager = metadataManager; + this.maxIteratorTasks = 2 * iteratorCount; + this.maxWorkerTasks = workerCount * 2; + this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(iteratorCount * 2)); + this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(workerCount + 1)); + this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount)); + this.logCountThreshold = logThreshold; + } + + + private List<K> getBounds(K startKey, K endKey) throws IOException { + RDBStore store = (RDBStore) this.metadataManager.getStore(); + List<LiveFileMetaData> sstFiles = store.getDb().getSstFileList(); + Set<K> keys = new HashSet<>(); + for (LiveFileMetaData sstFile : sstFiles) { + keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey())); + keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey())); + } + if (startKey != null) { + keys.add(startKey); + } + if (endKey != null) { + keys.add(endKey); + } + + return keys.stream().sorted().filter(Objects::nonNull) + .filter(key -> startKey == null || key.compareTo(startKey) >= 0) + .filter(key -> endKey == null || endKey.compareTo(key) >= 0) + .collect(Collectors.toList()); + } + + private void waitForQueueSize(Queue<Future<?>> futures, int expectedSize) + throws ExecutionException, InterruptedException { + while (!futures.isEmpty() && futures.size() > expectedSize) { + Future<?> f = futures.poll(); + f.get(); + } + } + + public void performTaskOnTableVals(String taskName, K startKey, K endKey, + Function<Table.KeyValue<K, V>, Void> keyOperation) throws IOException, ExecutionException, InterruptedException { + List<K> bounds = getBounds(startKey, endKey); + Queue<Future<?>> iterFutures = new LinkedList<>(); + Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>(); + AtomicLong keyCounter = new AtomicLong(); + AtomicLong prevLogCounter = new AtomicLong(); + for (int idx = 1; idx < bounds.size(); idx++) { + K beg = bounds.get(idx - 1); + K end = bounds.get(idx); + boolean inclusive = idx == bounds.size() - 1; + waitForQueueSize(iterFutures, maxIteratorTasks - 1); + iterFutures.add(iteratorExecutor.submit(() -> { + try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter = table.iterator()) { + iter.seek(beg); + while (iter.hasNext()) { + List<Table.KeyValue<K, V>> keyValues = new ArrayList<>(); + boolean reachedEnd = false; + while (iter.hasNext()) { + Table.KeyValue<K, V> kv = iter.next(); + if (kv.getKey().compareTo(end) < 0 || (inclusive && kv.getKey().compareTo(endKey) <= 0)) { + keyValues.add(kv); + } else { + reachedEnd = true; + break; + } + if (keyValues.size() >= maxNumberOfVals) { + break; + } + } + if (!keyValues.isEmpty()) { + waitForQueueSize(workerFutures, maxWorkerTasks - 1); + workerFutures.add(valueExecutors.submit(() -> { Review Comment: ```suggestion CompletableFuture.runAsync(() -> { for (Table.KeyValue<K, V> kv : keyValues) { keyOperation.apply(kv); } }, valueExecutors).exceptionally(ex -> { LOG.error("Exception in key processing", ex); return null; }); ``` ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java: ########## @@ -104,22 +123,36 @@ public TaskResult reprocess(OMMetadataManager omMetadataManager) { // configured batch threshold. // containerKeyCountMap can be flushed at the end since the number // of containers in a cluster will not have significant memory overhead. - Table<String, OmKeyInfo> omKeyInfoTable = - omMetadataManager.getKeyTable(layout); - try ( - TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> - keyIter = omKeyInfoTable.iterator()) { - while (keyIter.hasNext()) { - Table.KeyValue<String, OmKeyInfo> kv = keyIter.next(); - OmKeyInfo omKeyInfo = kv.getValue(); - handleKeyReprocess(kv.getKey(), omKeyInfo, containerKeyMap, - containerKeyCountMap); - if (!checkAndCallFlushToDB(containerKeyMap)) { - LOG.error("Unable to flush containerKey information to the DB"); - return buildTaskResult(false); + Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable(layout); + Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> { + try { + try { + lock.readLock().lock(); + handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap, + containerKeyCountMap); + } finally { + lock.readLock().unlock(); } - omKeyCount++; + omKeyCount.incrementAndGet(); + if (containerKeyMap.size() >= containerKeyFlushToDBMaxThreshold) { + try { + lock.writeLock().lock(); + if (!checkAndCallFlushToDB(containerKeyMap)) { + throw new UncheckedIOException(new IOException("Unable to flush containerKey information to the DB")); + } + } finally { + lock.writeLock().unlock(); + } + } + return null; + } catch (IOException e) { + throw new UncheckedIOException(e); } + }; + try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter = + new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable, + StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, containerKeyFlushToDBMaxThreshold)) { + keyIter.performTaskOnTableVals(this.getTaskName(), null, null, kvOperation); Review Comment: Thread pools (`iteratorExecutor` and `valueExecutors`) are only shut down in close(). If `performTaskOnTableVals` is interrupted, threads may remain active, leading to a resource leak. Can we implement try-finally in `performTaskOnTableVals` ? ```suggestion try { keyIter.performTaskOnTableVals(this.getTaskName(), null, null, kvOperation); } finally { close(); // Ensures shutdown even if an exception occurs. } ``` ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperTask.java: ########## @@ -104,22 +123,36 @@ public TaskResult reprocess(OMMetadataManager omMetadataManager) { // configured batch threshold. // containerKeyCountMap can be flushed at the end since the number // of containers in a cluster will not have significant memory overhead. - Table<String, OmKeyInfo> omKeyInfoTable = - omMetadataManager.getKeyTable(layout); - try ( - TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> - keyIter = omKeyInfoTable.iterator()) { - while (keyIter.hasNext()) { - Table.KeyValue<String, OmKeyInfo> kv = keyIter.next(); - OmKeyInfo omKeyInfo = kv.getValue(); - handleKeyReprocess(kv.getKey(), omKeyInfo, containerKeyMap, - containerKeyCountMap); - if (!checkAndCallFlushToDB(containerKeyMap)) { - LOG.error("Unable to flush containerKey information to the DB"); - return buildTaskResult(false); + Table<String, OmKeyInfo> omKeyInfoTable = omMetadataManager.getKeyTable(layout); + Function<Table.KeyValue<String, OmKeyInfo>, Void> kvOperation = kv -> { + try { + try { + lock.readLock().lock(); + handleKeyReprocess(kv.getKey(), kv.getValue(), containerKeyMap, + containerKeyCountMap); + } finally { + lock.readLock().unlock(); } - omKeyCount++; + omKeyCount.incrementAndGet(); + if (containerKeyMap.size() >= containerKeyFlushToDBMaxThreshold) { + try { + lock.writeLock().lock(); + if (!checkAndCallFlushToDB(containerKeyMap)) { + throw new UncheckedIOException(new IOException("Unable to flush containerKey information to the DB")); Review Comment: Can we avoid wrapping here to preserve context after logging ? ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks.util; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.rocksdb.LiveFileMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to iterate through a table in parallel by breaking table into multiple iterators. + */ +public class ParallelTableIteratorOperation<K extends Comparable<K>, V> implements Closeable { + private final Table<K, V> table; + private final Codec<K> keyCodec; + private final ExecutorService iteratorExecutor; + private final ExecutorService valueExecutors; + private final int maxNumberOfVals; + private final OMMetadataManager metadataManager; + private final int maxIteratorTasks; + private final int maxWorkerTasks; + private final long logCountThreshold; + + private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class); + public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table<K, V> table, Codec<K> keyCodec, + int iteratorCount, int workerCount, int maxNumberOfValsInMemory, + long logThreshold) { + this.table = table; + this.keyCodec = keyCodec; + this.metadataManager = metadataManager; + this.maxIteratorTasks = 2 * iteratorCount; + this.maxWorkerTasks = workerCount * 2; + this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(iteratorCount * 2)); + this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(workerCount + 1)); + this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount)); + this.logCountThreshold = logThreshold; + } + + + private List<K> getBounds(K startKey, K endKey) throws IOException { + RDBStore store = (RDBStore) this.metadataManager.getStore(); + List<LiveFileMetaData> sstFiles = store.getDb().getSstFileList(); + Set<K> keys = new HashSet<>(); + for (LiveFileMetaData sstFile : sstFiles) { + keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey())); + keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey())); + } + if (startKey != null) { + keys.add(startKey); + } + if (endKey != null) { + keys.add(endKey); + } + + return keys.stream().sorted().filter(Objects::nonNull) + .filter(key -> startKey == null || key.compareTo(startKey) >= 0) + .filter(key -> endKey == null || endKey.compareTo(key) >= 0) + .collect(Collectors.toList()); + } + + private void waitForQueueSize(Queue<Future<?>> futures, int expectedSize) + throws ExecutionException, InterruptedException { + while (!futures.isEmpty() && futures.size() > expectedSize) { + Future<?> f = futures.poll(); + f.get(); + } + } + + public void performTaskOnTableVals(String taskName, K startKey, K endKey, + Function<Table.KeyValue<K, V>, Void> keyOperation) throws IOException, ExecutionException, InterruptedException { + List<K> bounds = getBounds(startKey, endKey); + Queue<Future<?>> iterFutures = new LinkedList<>(); + Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>(); + AtomicLong keyCounter = new AtomicLong(); + AtomicLong prevLogCounter = new AtomicLong(); + for (int idx = 1; idx < bounds.size(); idx++) { + K beg = bounds.get(idx - 1); + K end = bounds.get(idx); + boolean inclusive = idx == bounds.size() - 1; + waitForQueueSize(iterFutures, maxIteratorTasks - 1); + iterFutures.add(iteratorExecutor.submit(() -> { + try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter = table.iterator()) { + iter.seek(beg); + while (iter.hasNext()) { + List<Table.KeyValue<K, V>> keyValues = new ArrayList<>(); + boolean reachedEnd = false; + while (iter.hasNext()) { + Table.KeyValue<K, V> kv = iter.next(); + if (kv.getKey().compareTo(end) < 0 || (inclusive && kv.getKey().compareTo(endKey) <= 0)) { + keyValues.add(kv); + } else { + reachedEnd = true; + break; + } + if (keyValues.size() >= maxNumberOfVals) { + break; + } + } + if (!keyValues.isEmpty()) { + waitForQueueSize(workerFutures, maxWorkerTasks - 1); + workerFutures.add(valueExecutors.submit(() -> { Review Comment: Can we replace the blocking Future calls here with `CompletableFuture` to avoid synchronous blocking and improve concurrency. Because Future.get() is called later to block execution until completion. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks.util; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.rocksdb.LiveFileMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to iterate through a table in parallel by breaking table into multiple iterators. + */ +public class ParallelTableIteratorOperation<K extends Comparable<K>, V> implements Closeable { + private final Table<K, V> table; + private final Codec<K> keyCodec; + private final ExecutorService iteratorExecutor; + private final ExecutorService valueExecutors; + private final int maxNumberOfVals; + private final OMMetadataManager metadataManager; + private final int maxIteratorTasks; + private final int maxWorkerTasks; + private final long logCountThreshold; + + private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class); + public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table<K, V> table, Codec<K> keyCodec, + int iteratorCount, int workerCount, int maxNumberOfValsInMemory, + long logThreshold) { + this.table = table; + this.keyCodec = keyCodec; + this.metadataManager = metadataManager; + this.maxIteratorTasks = 2 * iteratorCount; + this.maxWorkerTasks = workerCount * 2; + this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(iteratorCount * 2)); + this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<>(workerCount + 1)); + this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount)); + this.logCountThreshold = logThreshold; + } + + + private List<K> getBounds(K startKey, K endKey) throws IOException { + RDBStore store = (RDBStore) this.metadataManager.getStore(); + List<LiveFileMetaData> sstFiles = store.getDb().getSstFileList(); + Set<K> keys = new HashSet<>(); + for (LiveFileMetaData sstFile : sstFiles) { + keys.add(this.keyCodec.fromPersistedFormat(sstFile.smallestKey())); + keys.add(this.keyCodec.fromPersistedFormat(sstFile.largestKey())); + } + if (startKey != null) { + keys.add(startKey); + } + if (endKey != null) { + keys.add(endKey); + } + + return keys.stream().sorted().filter(Objects::nonNull) + .filter(key -> startKey == null || key.compareTo(startKey) >= 0) + .filter(key -> endKey == null || endKey.compareTo(key) >= 0) + .collect(Collectors.toList()); + } + + private void waitForQueueSize(Queue<Future<?>> futures, int expectedSize) + throws ExecutionException, InterruptedException { + while (!futures.isEmpty() && futures.size() > expectedSize) { + Future<?> f = futures.poll(); + f.get(); Review Comment: `waitForQueueSize` blocks until some futures complete. If all worker threads are blocked waiting for new tasks while the queue is full, a deadlock may occur. Can we introduce a timeout-based polling mechanism in `waitForQueueSize` ? ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java: ########## @@ -111,23 +124,34 @@ public TaskResult reprocess(OMMetadataManager omMetadataManager) { init(); for (String tableName : tables) { Table table = omMetadataManager.getTable(tableName); - - try (TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator - = table.iterator()) { + try { Review Comment: Any reason, we did not parallelize the tables processing here ? ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java: ########## @@ -87,15 +88,25 @@ public NSSummaryTask(ReconNamespaceSummaryManager long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong( OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD, OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT); + int maxKeysInMemory = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT); + int maxIterators = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT); + int maxWorkers = ozoneConfiguration.getInt( + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_WORKERS, + ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_WORKERS_DEFAULT); this.nsSummaryTaskWithFSO = new NSSummaryTaskWithFSO( reconNamespaceSummaryManager, reconOMMetadataManager, - nsSummaryFlushToDBMaxThreshold); + nsSummaryFlushToDBMaxThreshold, maxIterators, maxWorkers, maxKeysInMemory); this.nsSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy( reconNamespaceSummaryManager, reconOMMetadataManager, - ozoneConfiguration, nsSummaryFlushToDBMaxThreshold); + ozoneConfiguration, nsSummaryFlushToDBMaxThreshold, maxIterators, maxWorkers, maxKeysInMemory); this.nsSummaryTaskWithOBS = new NSSummaryTaskWithOBS( - reconNamespaceSummaryManager, reconOMMetadataManager, nsSummaryFlushToDBMaxThreshold); + reconNamespaceSummaryManager, reconOMMetadataManager, nsSummaryFlushToDBMaxThreshold, Review Comment: Since we are trying to achieve maximum concurrency and efficiency, i think some more improvements needed when we want to use `ParallelTableIteratorOperation` for parallel iterators and key set iteration, May be we should modify `NSSummaryTask` to use `CompletableFuture.allOf(...)` instead of `invokeAll()`, reducing idle waiting time. ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java: ########## @@ -111,23 +124,34 @@ public TaskResult reprocess(OMMetadataManager omMetadataManager) { init(); for (String tableName : tables) { Table table = omMetadataManager.getTable(tableName); - - try (TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator - = table.iterator()) { + try { if (tableHandlers.containsKey(tableName)) { - Triple<Long, Long, Long> details = - tableHandlers.get(tableName).getTableSizeAndCount(iterator); - objectCountMap.put(getTableCountKeyFromTable(tableName), - details.getLeft()); - unReplicatedSizeMap.put( - getUnReplicatedSizeKeyFromTable(tableName), details.getMiddle()); - replicatedSizeMap.put(getReplicatedSizeKeyFromTable(tableName), - details.getRight()); + try (TableIterator<String, ? extends Table.KeyValue<String, ?>> iterator + = table.iterator()) { + Triple<Long, Long, Long> details = + tableHandlers.get(tableName).getTableSizeAndCount(iterator); + objectCountMap.put(getTableCountKeyFromTable(tableName), + details.getLeft()); + unReplicatedSizeMap.put( + getUnReplicatedSizeKeyFromTable(tableName), details.getMiddle()); + replicatedSizeMap.put(getReplicatedSizeKeyFromTable(tableName), + details.getRight()); + } } else { - long count = Iterators.size(iterator); - objectCountMap.put(getTableCountKeyFromTable(tableName), count); + AtomicLong count = new AtomicLong(0); + try (ParallelTableIteratorOperation<String, byte[]> parallelTableIteratorOperation = + new ParallelTableIteratorOperation<>(omMetadataManager, Review Comment: Not sure if we need this just for the count of table records. We can optimize this part. Pls check #8069 ########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTaskWithFSO.java: ########## @@ -164,67 +178,99 @@ public Pair<Integer, Boolean> processWithFSO(OMUpdateEventBatch events, return new ImmutablePair<>(seekPos, false); } if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) { - if (!flushAndCommitNSToDB(nsSummaryMap)) { + if (!flushAndCommitNSToDB(nsSummaryMap, nsSummaryFlushToDBMaxThreshold)) { return new ImmutablePair<>(seekPos, false); } seekPos = eventCounter + 1; } } // flush and commit left out entries at end - if (!flushAndCommitNSToDB(nsSummaryMap)) { + if (!flushAndCommitNSToDB(nsSummaryMap, 0)) { return new ImmutablePair<>(seekPos, false); } LOG.debug("Completed a process run of NSSummaryTaskWithFSO"); return new ImmutablePair<>(seekPos, true); } public boolean reprocessWithFSO(OMMetadataManager omMetadataManager) { - Map<Long, NSSummary> nsSummaryMap = new HashMap<>(); + Map<Long, NSSummary> nsSummaryMap = new ConcurrentHashMap<>(); try { - Table<String, OmDirectoryInfo> dirTable = - omMetadataManager.getDirectoryTable(); - try (TableIterator<String, - ? extends Table.KeyValue<String, OmDirectoryInfo>> - dirTableIter = dirTable.iterator()) { - while (dirTableIter.hasNext()) { - Table.KeyValue<String, OmDirectoryInfo> kv = dirTableIter.next(); - OmDirectoryInfo directoryInfo = kv.getValue(); - handlePutDirEvent(directoryInfo, nsSummaryMap); - if (nsSummaryMap.size() >= nsSummaryFlushToDBMaxThreshold) { - if (!flushAndCommitNSToDB(nsSummaryMap)) { - return false; + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); Review Comment: Locks are not necessary in highly parallel environments when using concurrent data structures like ConcurrentHashMap. Can we use atomic operations (`e.g., ConcurrentHashMap.computeIfAbsent())` instead of explicit locks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
