This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 583b44dfa8 [enhancement](broker) Improve the availability of broker load (#10699) 583b44dfa8 is described below commit 583b44dfa83dc49bc743a5c217956077d951d240 Author: HB <hubia...@corp.netease.com> AuthorDate: Tue Aug 9 17:00:48 2022 +0800 [enhancement](broker) Improve the availability of broker load (#10699) --- .../conf/apache_hdfs_broker.conf | 2 +- .../org/apache/doris/broker/hdfs/BrokerConfig.java | 2 +- .../doris/broker/hdfs/FileSystemManager.java | 97 ++++++---------------- 3 files changed, 27 insertions(+), 74 deletions(-) diff --git a/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf b/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf index 92a30ac24e..8466de02cd 100644 --- a/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf +++ b/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf @@ -27,7 +27,7 @@ broker_ipc_port = 8000 # client session will be deleted if not receive ping after this time -client_expire_seconds = 300 +client_expire_seconds = 1800 # Advanced configurations # sys_log_dir = ${BROKER_HOME}/log diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java index c5a7b66acd..6381f16db2 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java @@ -29,7 +29,7 @@ public class BrokerConfig extends ConfigBase { public static int hdfs_write_buffer_size_kb = 1024; @ConfField - public static int client_expire_seconds = 300; + public static int client_expire_seconds = 1800; @ConfField public static int broker_ipc_port = 8000; diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index b522d21819..262875de25 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -120,7 +120,6 @@ public class FileSystemManager { clientContextManager = new ClientContextManager(handleManagementPool); readBufferSize = BrokerConfig.hdfs_read_buffer_size_kb << 10; writeBufferSize = BrokerConfig.hdfs_write_buffer_size_kb << 10; - handleManagementPool.schedule(new FileSystemExpirationChecker(), 0, TimeUnit.SECONDS); } private static String preparePrincipal(String originalPrincipal) throws UnknownHostException { @@ -212,7 +211,6 @@ public class FileSystemManager { } String hdfsUgi = username + "," + password; FileSystemIdentity fileSystemIdentity = null; - BrokerFileSystem fileSystem = null; if (authentication.equals(AUTHENTICATION_SIMPLE)) { fileSystemIdentity = new FileSystemIdentity(host, hdfsUgi); } else { @@ -242,19 +240,9 @@ public class FileSystemManager { e.getMessage()); } } - cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity)); - fileSystem = cachedFileSystem.get(fileSystemIdentity); - if (fileSystem == null) { - // it means it is removed concurrently by checker thread - return null; - } + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); fileSystem.getLock().lock(); try { - if (!cachedFileSystem.containsKey(fileSystemIdentity)) { - // this means the file system is closed by file system checker thread - // it is a corner case - return null; - } if (fileSystem.getDFSFileSystem() == null) { logger.info("create file system for new path: " + path); UserGroupInformation ugi = null; @@ -406,20 +394,10 @@ public class FileSystemManager { String disableCache = properties.getOrDefault(FS_S3A_IMPL_DISABLE_CACHE, "true"); String s3aUgi = accessKey + "," + secretKey; FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, s3aUgi); - BrokerFileSystem fileSystem = null; cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity)); - fileSystem = cachedFileSystem.get(fileSystemIdentity); - if (fileSystem == null) { - // it means it is removed concurrently by checker thread - return null; - } + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); fileSystem.getLock().lock(); try { - if (!cachedFileSystem.containsKey(fileSystemIdentity)) { - // this means the file system is closed by file system checker thread - // it is a corner case - return null; - } if (fileSystem.getDFSFileSystem() == null) { logger.info("create file system for new path " + path); // create a new filesystem @@ -462,20 +440,9 @@ public class FileSystemManager { String host = KS3_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost(); String ks3aUgi = accessKey + "," + secretKey; FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, ks3aUgi); - BrokerFileSystem fileSystem = null; - cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity)); - fileSystem = cachedFileSystem.get(fileSystemIdentity); - if (fileSystem == null) { - // it means it is removed concurrently by checker thread - return null; - } + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); fileSystem.getLock().lock(); try { - if (!cachedFileSystem.containsKey(fileSystemIdentity)) { - // this means the file system is closed by file system checker thread - // it is a corner case - return null; - } if (fileSystem.getDFSFileSystem() == null) { logger.info("could not find file system for path " + path + " create a new one"); // create a new filesystem @@ -549,23 +516,9 @@ public class FileSystemManager { e.getMessage()); } } - - - BrokerFileSystem fileSystem = null; - cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity)); - fileSystem = cachedFileSystem.get(fileSystemIdentity); - if (fileSystem == null) { - // it means it is removed concurrently by checker thread - return null; - } + BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity); fileSystem.getLock().lock(); - try { - if (!cachedFileSystem.containsKey(fileSystemIdentity)) { - // this means the file system is closed by file system checker thread - // it is a corner case - return null; - } // create a new filesystem Configuration conf = new Configuration(); for (Map.Entry<String, String> propElement : properties.entrySet()) { @@ -895,29 +848,29 @@ public class FileSystemManager { } return readLength; } - - class FileSystemExpirationChecker implements Runnable { - @Override - public void run() { - try { - for (BrokerFileSystem fileSystem : cachedFileSystem.values()) { - if (fileSystem.isExpired(BrokerConfig.client_expire_seconds)) { - logger.info("file system " + fileSystem + " is expired, close and remove it"); - fileSystem.getLock().lock(); - try { - fileSystem.closeFileSystem(); - } catch (Throwable t) { - logger.error("errors while close file system", t); - } finally { - cachedFileSystem.remove(fileSystem.getIdentity()); - fileSystem.getLock().unlock(); - } - } + + private BrokerFileSystem updateCachedFileSystem(FileSystemIdentity fileSystemIdentity) { + BrokerFileSystem brokerFileSystem = null; + if (cachedFileSystem.containsKey(fileSystemIdentity)) { + brokerFileSystem = cachedFileSystem.get(fileSystemIdentity); + if (brokerFileSystem.isExpired(BrokerConfig.client_expire_seconds)) { + logger.info("file system " + brokerFileSystem + " is expired, close and remove it"); + brokerFileSystem.getLock().lock(); + try { + brokerFileSystem.closeFileSystem(); + } catch (Throwable t) { + logger.error("errors while close file system", t); + } finally { + cachedFileSystem.remove(brokerFileSystem.getIdentity()); + brokerFileSystem.getLock().unlock(); + brokerFileSystem = new BrokerFileSystem(fileSystemIdentity); + cachedFileSystem.put(fileSystemIdentity, brokerFileSystem); } - } finally { - FileSystemManager.this.handleManagementPool.schedule(this, 60, TimeUnit.SECONDS); } + } else { + brokerFileSystem = new BrokerFileSystem(fileSystemIdentity); + cachedFileSystem.put(fileSystemIdentity, brokerFileSystem); } - + return brokerFileSystem; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org