This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 583b44dfa8 [enhancement](broker) Improve the availability of broker 
load (#10699)
583b44dfa8 is described below

commit 583b44dfa83dc49bc743a5c217956077d951d240
Author: HB <hubia...@corp.netease.com>
AuthorDate: Tue Aug 9 17:00:48 2022 +0800

    [enhancement](broker) Improve the availability of broker load (#10699)
---
 .../conf/apache_hdfs_broker.conf                   |  2 +-
 .../org/apache/doris/broker/hdfs/BrokerConfig.java |  2 +-
 .../doris/broker/hdfs/FileSystemManager.java       | 97 ++++++----------------
 3 files changed, 27 insertions(+), 74 deletions(-)

diff --git a/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf 
b/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf
index 92a30ac24e..8466de02cd 100644
--- a/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf
+++ b/fs_brokers/apache_hdfs_broker/conf/apache_hdfs_broker.conf
@@ -27,7 +27,7 @@
 broker_ipc_port = 8000
 
 # client session will be deleted if not receive ping after this time
-client_expire_seconds = 300
+client_expire_seconds = 1800
 
 # Advanced configurations
 # sys_log_dir = ${BROKER_HOME}/log
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
index c5a7b66acd..6381f16db2 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
@@ -29,7 +29,7 @@ public class BrokerConfig extends ConfigBase {
     public static int hdfs_write_buffer_size_kb = 1024;
     
     @ConfField
-    public static int client_expire_seconds = 300;
+    public static int client_expire_seconds = 1800;
     
     @ConfField
     public static int broker_ipc_port = 8000;
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index b522d21819..262875de25 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -120,7 +120,6 @@ public class FileSystemManager {
         clientContextManager = new ClientContextManager(handleManagementPool);
         readBufferSize = BrokerConfig.hdfs_read_buffer_size_kb << 10;
         writeBufferSize = BrokerConfig.hdfs_write_buffer_size_kb << 10;
-        handleManagementPool.schedule(new FileSystemExpirationChecker(), 0, 
TimeUnit.SECONDS);
     }
 
     private static String preparePrincipal(String originalPrincipal) throws 
UnknownHostException {
@@ -212,7 +211,6 @@ public class FileSystemManager {
         }
         String hdfsUgi = username + "," + password;
         FileSystemIdentity fileSystemIdentity = null;
-        BrokerFileSystem fileSystem = null;
         if (authentication.equals(AUTHENTICATION_SIMPLE)) {
             fileSystemIdentity = new FileSystemIdentity(host, hdfsUgi);
         } else {
@@ -242,19 +240,9 @@ public class FileSystemManager {
                         e.getMessage());
             }
         }
-        cachedFileSystem.putIfAbsent(fileSystemIdentity, new 
BrokerFileSystem(fileSystemIdentity));
-        fileSystem = cachedFileSystem.get(fileSystemIdentity);
-        if (fileSystem == null) {
-            // it means it is removed concurrently by checker thread
-            return null;
-        }
+        BrokerFileSystem fileSystem = 
updateCachedFileSystem(fileSystemIdentity);
         fileSystem.getLock().lock();
         try {
-            if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
-                // this means the file system is closed by file system checker 
thread
-                // it is a corner case
-                return null;
-            }
             if (fileSystem.getDFSFileSystem() == null) {
                 logger.info("create file system for new path: " + path);
                 UserGroupInformation ugi = null;
@@ -406,20 +394,10 @@ public class FileSystemManager {
         String disableCache = 
properties.getOrDefault(FS_S3A_IMPL_DISABLE_CACHE, "true");
         String s3aUgi = accessKey + "," + secretKey;
         FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, 
s3aUgi);
-        BrokerFileSystem fileSystem = null;
         cachedFileSystem.putIfAbsent(fileSystemIdentity, new 
BrokerFileSystem(fileSystemIdentity));
-        fileSystem = cachedFileSystem.get(fileSystemIdentity);
-        if (fileSystem == null) {
-            // it means it is removed concurrently by checker thread
-            return null;
-        }
+        BrokerFileSystem fileSystem = 
updateCachedFileSystem(fileSystemIdentity);
         fileSystem.getLock().lock();
         try {
-            if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
-                // this means the file system is closed by file system checker 
thread
-                // it is a corner case
-                return null;
-            }
             if (fileSystem.getDFSFileSystem() == null) {
                 logger.info("create file system for new path " + path);
                 // create a new filesystem
@@ -462,20 +440,9 @@ public class FileSystemManager {
         String host = KS3_SCHEME + "://" + endpoint + "/" + 
pathUri.getUri().getHost();
         String ks3aUgi = accessKey + "," + secretKey;
         FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, 
ks3aUgi);
-        BrokerFileSystem fileSystem = null;
-        cachedFileSystem.putIfAbsent(fileSystemIdentity, new 
BrokerFileSystem(fileSystemIdentity));
-        fileSystem = cachedFileSystem.get(fileSystemIdentity);
-        if (fileSystem == null) {
-            // it means it is removed concurrently by checker thread
-            return null;
-        }
+        BrokerFileSystem fileSystem = 
updateCachedFileSystem(fileSystemIdentity);
         fileSystem.getLock().lock();
         try {
-            if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
-                // this means the file system is closed by file system checker 
thread
-                // it is a corner case
-                return null;
-            }
             if (fileSystem.getDFSFileSystem() == null) {
                 logger.info("could not find file system for path " + path + " 
create a new one");
                 // create a new filesystem
@@ -549,23 +516,9 @@ public class FileSystemManager {
                         e.getMessage());
             }
         }
-
-
-        BrokerFileSystem fileSystem = null;
-        cachedFileSystem.putIfAbsent(fileSystemIdentity, new 
BrokerFileSystem(fileSystemIdentity));
-        fileSystem = cachedFileSystem.get(fileSystemIdentity);
-        if (fileSystem == null) {
-            // it means it is removed concurrently by checker thread
-            return null;
-        }
+        BrokerFileSystem fileSystem = 
updateCachedFileSystem(fileSystemIdentity);
         fileSystem.getLock().lock();
-
         try {
-            if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
-                // this means the file system is closed by file system checker 
thread
-                // it is a corner case
-                return null;
-            }
             // create a new filesystem
             Configuration conf = new Configuration();
             for (Map.Entry<String, String> propElement : 
properties.entrySet()) {
@@ -895,29 +848,29 @@ public class FileSystemManager {
         }
         return readLength;
     }
-    
-    class FileSystemExpirationChecker implements Runnable {
-        @Override
-        public void run() {
-            try {
-                for (BrokerFileSystem fileSystem : cachedFileSystem.values()) {
-                    if 
(fileSystem.isExpired(BrokerConfig.client_expire_seconds)) {
-                        logger.info("file system " + fileSystem + " is 
expired, close and remove it");
-                        fileSystem.getLock().lock();
-                        try {
-                            fileSystem.closeFileSystem();
-                        } catch (Throwable t) {
-                            logger.error("errors while close file system", t);
-                        } finally {
-                            cachedFileSystem.remove(fileSystem.getIdentity());
-                            fileSystem.getLock().unlock();
-                        }
-                    }
+
+    private BrokerFileSystem updateCachedFileSystem(FileSystemIdentity 
fileSystemIdentity) {
+        BrokerFileSystem brokerFileSystem = null;
+        if (cachedFileSystem.containsKey(fileSystemIdentity)) {
+            brokerFileSystem = cachedFileSystem.get(fileSystemIdentity);
+            if 
(brokerFileSystem.isExpired(BrokerConfig.client_expire_seconds)) {
+                logger.info("file system " + brokerFileSystem + " is expired, 
close and remove it");
+                brokerFileSystem.getLock().lock();
+                try {
+                    brokerFileSystem.closeFileSystem();
+                } catch (Throwable t) {
+                    logger.error("errors while close file system", t);
+                } finally {
+                    cachedFileSystem.remove(brokerFileSystem.getIdentity());
+                    brokerFileSystem.getLock().unlock();
+                    brokerFileSystem = new 
BrokerFileSystem(fileSystemIdentity);
+                    cachedFileSystem.put(fileSystemIdentity, brokerFileSystem);
                 }
-            } finally {
-                FileSystemManager.this.handleManagementPool.schedule(this, 60, 
TimeUnit.SECONDS);
             }
+        } else {
+            brokerFileSystem = new BrokerFileSystem(fileSystemIdentity);
+            cachedFileSystem.put(fileSystemIdentity, brokerFileSystem);
         }
-        
+        return brokerFileSystem;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to