This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bb6cc76a02 [INLONG-10470][Audit] Optimize Audit Proxy configuration 
update retry logic (#10471)
bb6cc76a02 is described below

commit bb6cc76a02970816e6ac7a5bf63be84b414ff68d
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Thu Jun 20 19:52:00 2024 +0800

    [INLONG-10470][Audit] Optimize Audit Proxy configuration update retry logic 
(#10471)
---
 .../org/apache/inlong/audit/utils/ThreadUtils.java | 33 ++++++++++++++
 .../org/apache/inlong/audit/AuditReporterImpl.java |  4 +-
 .../org/apache/inlong/audit/send/ProxyManager.java | 51 +++++++++++++++-------
 3 files changed, 70 insertions(+), 18 deletions(-)

diff --git 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/ThreadUtils.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/ThreadUtils.java
new file mode 100644
index 0000000000..52101b344a
--- /dev/null
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/ThreadUtils.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ThreadUtils {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HttpUtils.class);
+    public static void sleep(long milliseconds) {
+        try {
+            Thread.sleep(milliseconds);
+        } catch (InterruptedException e) {
+            LOGGER.error("Sleep was interrupted", e);
+        }
+    }
+}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index d1748a5b4b..0f9d4b42c1 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -604,8 +604,8 @@ public class AuditReporterImpl implements Serializable {
         ProxyManager.getInstance().setManagerTimeout(timeoutMs);
     }
 
-    public void setAutoUpdateAuditProxy(boolean autoUpdateAuditProxy) {
-        
ProxyManager.getInstance().setAutoUpdateAuditProxy(autoUpdateAuditProxy);
+    public void setAutoUpdateAuditProxy() {
+        ProxyManager.getInstance().setAutoUpdateAuditProxy();
     }
 
     public void setUpdateInterval(int updateInterval) {
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
index 159839d2fc..127609d2ab 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java
@@ -21,6 +21,7 @@ import org.apache.inlong.audit.entity.AuditComponent;
 import org.apache.inlong.audit.entity.AuditProxy;
 import org.apache.inlong.audit.entity.CommonResponse;
 import org.apache.inlong.audit.utils.HttpUtils;
+import org.apache.inlong.audit.utils.ThreadUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,6 +30,7 @@ import java.net.InetSocketAddress;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -42,13 +44,14 @@ public class ProxyManager {
     private final ScheduledExecutorService timer = 
Executors.newSingleThreadScheduledExecutor();
     private final static String GET_AUDIT_PROXY_API_PATH = 
"/inlong/manager/openapi/audit/getAuditProxy";
     private int timeoutMs = 10000;
-    private boolean autoUpdateAuditProxy = false;
     private int updateInterval = 60000;
     private String auditProxyApiUrl;
     private AuditComponent component;
     private String secretId;
     private String secretKey;
     private volatile boolean timerStarted = false;
+    private static final int MAX_RETRY_TIMES = 1440;
+    private static final int RETRY_INTERVAL_MS = 10000;
 
     private ProxyManager() {
     }
@@ -69,9 +72,6 @@ public class ProxyManager {
 
     public synchronized void setManagerConfig(AuditComponent component, String 
managerHost, String secretId,
             String secretKey) {
-        if (!managerHost.endsWith("/")) {
-            managerHost = managerHost + "/";
-        }
         if (!(managerHost.startsWith("http://";) || 
managerHost.startsWith("https://";))) {
             managerHost = "http://"; + managerHost;
         }
@@ -82,34 +82,52 @@ public class ProxyManager {
         this.secretId = secretId;
         this.secretKey = secretKey;
 
-        updateAuditProxy();
+        retryAsync();
+    }
 
-        if (autoUpdateAuditProxy) {
-            startTimer();
-            LOGGER.info("Auto update from manager");
-        }
+    private void retryAsync() {
+        CompletableFuture.runAsync(() -> {
+            long retryIntervalMs = RETRY_INTERVAL_MS;
+            for (int retryTime = 1; retryTime < MAX_RETRY_TIMES; retryTime++) {
+                try {
+                    if (updateAuditProxy()) {
+                        LOGGER.info("Audit proxy updated successfully");
+                        break;
+                    }
+                    LOGGER.warn("Failed to update audit proxy. Retrying in {} 
times...", retryTime);
+                } catch (Exception exception) {
+                    LOGGER.error("Failed to update audit proxy. Retrying in {} 
times...", retryTime, exception);
+                } finally {
+                    ThreadUtils.sleep(Math.min(retryIntervalMs, 
updateInterval));
+                    retryIntervalMs *= 2;
+                }
+            }
+        });
     }
 
-    private void updateAuditProxy() {
+    private boolean updateAuditProxy() {
         String response = HttpUtils.httpGet(component.getComponent(), 
auditProxyApiUrl, secretId, secretKey, timeoutMs);
         if (response == null) {
             LOGGER.error("Response is null: {} {} {} ", 
component.getComponent(), auditProxyApiUrl, secretId,
                     secretKey);
-            return;
+            return false;
         }
         CommonResponse<AuditProxy> commonResponse =
                 CommonResponse.fromJson(response, AuditProxy.class);
-        if (commonResponse == null) {
+        if (commonResponse == null || commonResponse.getData().isEmpty()) {
             LOGGER.error("No data in the response: {} {} {} {}", 
component.getComponent(), auditProxyApiUrl, secretId,
                     secretKey);
-            return;
+            return false;
         }
         HashSet<String> proxyList = new HashSet<>();
         for (AuditProxy auditProxy : commonResponse.getData()) {
             proxyList.add(auditProxy.toString());
         }
+
         setAuditProxy(proxyList);
+
         LOGGER.info("Get audit proxy from manager: {}", proxyList);
+        return true;
     }
 
     private synchronized void startTimer() {
@@ -117,7 +135,7 @@ public class ProxyManager {
             return;
         }
         timer.scheduleWithFixedDelay(this::updateAuditProxy,
-                0,
+                updateInterval,
                 updateInterval,
                 TimeUnit.MILLISECONDS);
         timerStarted = true;
@@ -127,8 +145,9 @@ public class ProxyManager {
         this.timeoutMs = timeoutMs;
     }
 
-    public void setAutoUpdateAuditProxy(boolean autoUpdateAuditProxy) {
-        this.autoUpdateAuditProxy = autoUpdateAuditProxy;
+    public void setAutoUpdateAuditProxy() {
+        startTimer();
+        LOGGER.info("Auto update Audit Proxy info from manager");
     }
 
     public void setUpdateInterval(int updateInterval) {

Reply via email to