This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new bb6cc76a02 [INLONG-10470][Audit] Optimize Audit Proxy configuration update retry logic (#10471) bb6cc76a02 is described below commit bb6cc76a02970816e6ac7a5bf63be84b414ff68d Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Thu Jun 20 19:52:00 2024 +0800 [INLONG-10470][Audit] Optimize Audit Proxy configuration update retry logic (#10471) --- .../org/apache/inlong/audit/utils/ThreadUtils.java | 33 ++++++++++++++ .../org/apache/inlong/audit/AuditReporterImpl.java | 4 +- .../org/apache/inlong/audit/send/ProxyManager.java | 51 +++++++++++++++------- 3 files changed, 70 insertions(+), 18 deletions(-) diff --git a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/ThreadUtils.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/ThreadUtils.java new file mode 100644 index 0000000000..52101b344a --- /dev/null +++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/utils/ThreadUtils.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.audit.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ThreadUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(HttpUtils.class); + public static void sleep(long milliseconds) { + try { + Thread.sleep(milliseconds); + } catch (InterruptedException e) { + LOGGER.error("Sleep was interrupted", e); + } + } +} diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java index d1748a5b4b..0f9d4b42c1 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java @@ -604,8 +604,8 @@ public class AuditReporterImpl implements Serializable { ProxyManager.getInstance().setManagerTimeout(timeoutMs); } - public void setAutoUpdateAuditProxy(boolean autoUpdateAuditProxy) { - ProxyManager.getInstance().setAutoUpdateAuditProxy(autoUpdateAuditProxy); + public void setAutoUpdateAuditProxy() { + ProxyManager.getInstance().setAutoUpdateAuditProxy(); } public void setUpdateInterval(int updateInterval) { diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java index 159839d2fc..127609d2ab 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ProxyManager.java @@ -21,6 +21,7 @@ import org.apache.inlong.audit.entity.AuditComponent; import org.apache.inlong.audit.entity.AuditProxy; import org.apache.inlong.audit.entity.CommonResponse; import org.apache.inlong.audit.utils.HttpUtils; +import org.apache.inlong.audit.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +30,7 @@ import java.net.InetSocketAddress; import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -42,13 +44,14 @@ public class ProxyManager { private final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(); private final static String GET_AUDIT_PROXY_API_PATH = "/inlong/manager/openapi/audit/getAuditProxy"; private int timeoutMs = 10000; - private boolean autoUpdateAuditProxy = false; private int updateInterval = 60000; private String auditProxyApiUrl; private AuditComponent component; private String secretId; private String secretKey; private volatile boolean timerStarted = false; + private static final int MAX_RETRY_TIMES = 1440; + private static final int RETRY_INTERVAL_MS = 10000; private ProxyManager() { } @@ -69,9 +72,6 @@ public class ProxyManager { public synchronized void setManagerConfig(AuditComponent component, String managerHost, String secretId, String secretKey) { - if (!managerHost.endsWith("/")) { - managerHost = managerHost + "/"; - } if (!(managerHost.startsWith("http://") || managerHost.startsWith("https://"))) { managerHost = "http://" + managerHost; } @@ -82,34 +82,52 @@ public class ProxyManager { this.secretId = secretId; this.secretKey = secretKey; - updateAuditProxy(); + retryAsync(); + } - if (autoUpdateAuditProxy) { - startTimer(); - LOGGER.info("Auto update from manager"); - } + private void retryAsync() { + CompletableFuture.runAsync(() -> { + long retryIntervalMs = RETRY_INTERVAL_MS; + for (int retryTime = 1; retryTime < MAX_RETRY_TIMES; retryTime++) { + try { + if (updateAuditProxy()) { + LOGGER.info("Audit proxy updated successfully"); + break; + } + LOGGER.warn("Failed to update audit proxy. Retrying in {} times...", retryTime); + } catch (Exception exception) { + LOGGER.error("Failed to update audit proxy. Retrying in {} times...", retryTime, exception); + } finally { + ThreadUtils.sleep(Math.min(retryIntervalMs, updateInterval)); + retryIntervalMs *= 2; + } + } + }); } - private void updateAuditProxy() { + private boolean updateAuditProxy() { String response = HttpUtils.httpGet(component.getComponent(), auditProxyApiUrl, secretId, secretKey, timeoutMs); if (response == null) { LOGGER.error("Response is null: {} {} {} ", component.getComponent(), auditProxyApiUrl, secretId, secretKey); - return; + return false; } CommonResponse<AuditProxy> commonResponse = CommonResponse.fromJson(response, AuditProxy.class); - if (commonResponse == null) { + if (commonResponse == null || commonResponse.getData().isEmpty()) { LOGGER.error("No data in the response: {} {} {} {}", component.getComponent(), auditProxyApiUrl, secretId, secretKey); - return; + return false; } HashSet<String> proxyList = new HashSet<>(); for (AuditProxy auditProxy : commonResponse.getData()) { proxyList.add(auditProxy.toString()); } + setAuditProxy(proxyList); + LOGGER.info("Get audit proxy from manager: {}", proxyList); + return true; } private synchronized void startTimer() { @@ -117,7 +135,7 @@ public class ProxyManager { return; } timer.scheduleWithFixedDelay(this::updateAuditProxy, - 0, + updateInterval, updateInterval, TimeUnit.MILLISECONDS); timerStarted = true; @@ -127,8 +145,9 @@ public class ProxyManager { this.timeoutMs = timeoutMs; } - public void setAutoUpdateAuditProxy(boolean autoUpdateAuditProxy) { - this.autoUpdateAuditProxy = autoUpdateAuditProxy; + public void setAutoUpdateAuditProxy() { + startTimer(); + LOGGER.info("Auto update Audit Proxy info from manager"); } public void setUpdateInterval(int updateInterval) {