[ https://issues.apache.org/jira/browse/HADOOP-19472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18017583#comment-18017583 ]
ASF GitHub Bot commented on HADOOP-19472: ----------------------------------------- manika137 commented on code in PR #7669: URL: https://github.com/apache/hadoop/pull/7669#discussion_r2314852491 ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java: ########## @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_MEDIUM_HEAP_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HEAP_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS; + +/** + * Manages a thread pool for writing operations, adjusting the pool size based on CPU utilization. + */ +public final class WriteThreadPoolSizeManager implements Closeable { + + /* Maximum allowed size for the thread pool. */ + private final int maxThreadPoolSize; + /* Executor for periodically monitoring CPU usage. */ + private final ScheduledExecutorService cpuMonitorExecutor; + /* Thread pool whose size is dynamically managed. */ + private volatile ExecutorService boundedThreadPool; + /* Lock to ensure thread-safe updates to the thread pool. */ + private final Lock lock = new ReentrantLock(); + /* New computed max size for the thread pool after adjustment. */ + private volatile int newMaxPoolSize; + /* Logger instance for logging events from WriteThreadPoolSizeManager. */ + private static final Logger LOG = LoggerFactory.getLogger( + WriteThreadPoolSizeManager.class); + /* Map to maintain a WriteThreadPoolSizeManager instance per filesystem. */ + private static final ConcurrentHashMap<String, WriteThreadPoolSizeManager> + POOL_SIZE_MANAGER_MAP = new ConcurrentHashMap<>(); + /* Name of the filesystem associated with this manager. */ + private final String filesystemName; + /* Initial size for the thread pool when created. */ + private final int initialPoolSize; + /* Initially available heap memory. */ + private final long initialAvailableHeapMemory; + /* The configuration instance. */ + private final AbfsConfiguration abfsConfiguration; + + /** + * Private constructor to initialize the write thread pool and CPU monitor executor + * based on system resources and ABFS configuration. + * + * @param filesystemName Name of the ABFS filesystem. + * @param abfsConfiguration Configuration containing pool size parameters. + */ + private WriteThreadPoolSizeManager(String filesystemName, + AbfsConfiguration abfsConfiguration) { + this.filesystemName = filesystemName; + this.abfsConfiguration = abfsConfiguration; + int availableProcessors = Runtime.getRuntime().availableProcessors(); + /* Get the heap space available when the instance is created */ + this.initialAvailableHeapMemory = getAvailableHeapMemory(); + /* Compute the max pool size */ + int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors, initialAvailableHeapMemory); + + /* Get the initial pool size from config, fallback to at least 1 */ + this.initialPoolSize = Math.max(1, + abfsConfiguration.getWriteMaxConcurrentRequestCount()); + + /* Set the upper bound for the thread pool size */ + this.maxThreadPoolSize = Math.max(computedMaxPoolSize, initialPoolSize); + + /* Initialize the bounded thread pool executor */ + this.boundedThreadPool = Executors.newFixedThreadPool(initialPoolSize); + + ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool; + executor.setKeepAliveTime( + abfsConfiguration.getWriteThreadPoolKeepAliveTime(), TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(true); + + /* Create a scheduled executor for CPU monitoring and pool adjustment */ + this.cpuMonitorExecutor = Executors.newScheduledThreadPool( + abfsConfiguration.getWriteCorePoolSize()); + } + + public AbfsConfiguration getAbfsConfiguration() { + return abfsConfiguration; + } + + /** + * Calculates the max thread pool size using a multiplier based on + * memory per core. Higher memory per core results in a larger multiplier. + * + * @param availableProcessors Number of CPU cores. + * @return Computed max thread pool size. + */ + private int getComputedMaxPoolSize(final int availableProcessors, long initialAvailableHeapMemory) { + LOG.debug("The available heap space in GB {} ", initialAvailableHeapMemory); + LOG.debug("The number of available processors is {} ", availableProcessors); + int maxpoolSize = getMemoryTierMaxThreads(initialAvailableHeapMemory, availableProcessors); + LOG.debug("The max thread pool size is {} ", maxpoolSize); + return maxpoolSize; + } + + /** + * Calculates the available heap memory in gigabytes. + * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap memory + * allowed for the JVM and subtracts the currently used memory (total - free) + * to determine how much heap memory is still available. + * The result is rounded up to the nearest gigabyte. + * + * @return the available heap memory in gigabytes + */ + private long getAvailableHeapMemory() { + Runtime runtime = Runtime.getRuntime(); + long maxMemory = runtime.maxMemory(); + long usedMemory = runtime.totalMemory() - runtime.freeMemory(); + long availableHeapBytes = maxMemory - usedMemory; + return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE; + } + + /** + * Returns aggressive thread count = CPU cores × multiplier based on heap tier. + */ + private int getMemoryTierMaxThreads(long availableHeapGB, int availableProcessors) { + int multiplier; + if (availableHeapGB <= LOW_HEAP_SPACE_FACTOR) { + multiplier = abfsConfiguration.getLowTierMemoryMultiplier(); + } else if (availableHeapGB <= MEDIUM_HEAP_SPACE_FACTOR) { + multiplier = abfsConfiguration.getMediumTierMemoryMultiplier(); + } else { + multiplier = abfsConfiguration.getHighTierMemoryMultiplier(); + } + return availableProcessors * multiplier; + } + + /** + * Returns the singleton instance of WriteThreadPoolSizeManager for the given filesystem. + * + * @param filesystemName the name of the filesystem. + * @param abfsConfiguration the configuration for the ABFS. + * + * @return the singleton instance. + */ + public static synchronized WriteThreadPoolSizeManager getInstance( + String filesystemName, AbfsConfiguration abfsConfiguration) { + /* Check if an instance already exists in the map for the given filesystem */ + WriteThreadPoolSizeManager existingInstance = POOL_SIZE_MANAGER_MAP.get( + filesystemName); + + /* If an existing instance is found, return it */ + if (existingInstance != null && existingInstance.boundedThreadPool != null + && !existingInstance.boundedThreadPool.isShutdown()) { + return existingInstance; + } + + /* Otherwise, create a new instance, put it in the map, and return it */ + LOG.debug( + "Creating new WriteThreadPoolSizeManager instance for filesystem: {}", + filesystemName); + WriteThreadPoolSizeManager newInstance = new WriteThreadPoolSizeManager( + filesystemName, abfsConfiguration); + POOL_SIZE_MANAGER_MAP.put(filesystemName, newInstance); + return newInstance; + } + + /** + * Adjusts the thread pool size to the specified maximum pool size. + * + * @param newMaxPoolSize the new maximum pool size. + */ + private void adjustThreadPoolSize(int newMaxPoolSize) { + synchronized (this) { + ThreadPoolExecutor threadPoolExecutor + = ((ThreadPoolExecutor) boundedThreadPool); + int currentCorePoolSize = threadPoolExecutor.getCorePoolSize(); + + if (newMaxPoolSize >= currentCorePoolSize) { + threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize); + threadPoolExecutor.setCorePoolSize(newMaxPoolSize); + } else { + threadPoolExecutor.setCorePoolSize(newMaxPoolSize); + threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize); + } + + LOG.debug("The thread pool size is: {} ", newMaxPoolSize); + LOG.debug("The pool size is: {} ", threadPoolExecutor.getPoolSize()); + LOG.debug("The active thread count is: {}", threadPoolExecutor.getActiveCount()); + } + } + + /** + * Starts monitoring the CPU utilization and adjusts the thread pool size accordingly. + */ + synchronized void startCPUMonitoring() { + cpuMonitorExecutor.scheduleAtFixedRate(() -> { + double cpuUtilization = getCpuUtilization(); + LOG.debug("Current CPU Utilization is this: {}", cpuUtilization); + try { + adjustThreadPoolSizeBasedOnCPU(cpuUtilization); + } catch (InterruptedException e) { + throw new RuntimeException(String.format( + "Thread pool size adjustment interrupted for filesystem %s", + filesystemName), e); + } + }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(), TimeUnit.SECONDS); + } + + /** + * Gets the current CPU utilization. + * + * @return the CPU utilization as a percentage (0.0 to 1.0). + */ + private double getCpuUtilization() { + OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean(); + if (osBean instanceof com.sun.management.OperatingSystemMXBean) { + com.sun.management.OperatingSystemMXBean sunOsBean + = (com.sun.management.OperatingSystemMXBean) osBean; + double cpuLoad = sunOsBean.getSystemCpuLoad(); + if (cpuLoad >= 0) { Review Comment: if cpuLoad is -1.0, should we log it? > ABFS: Enhance performance of ABFS driver for write-heavy workloads > ------------------------------------------------------------------ > > Key: HADOOP-19472 > URL: https://issues.apache.org/jira/browse/HADOOP-19472 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/azure > Affects Versions: 3.4.1 > Reporter: Anmol Asrani > Assignee: Anmol Asrani > Priority: Minor > Labels: pull-request-available > Fix For: 3.4.1 > > > The goal of this work item is to enhance the performance of ABFS Driver for > write-heavy workloads by improving concurrency within writes. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org