This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b37f944f23 [INLONG-11734][SDK] Optimize SDK stop processing flow 
(#11735)
b37f944f23 is described below

commit b37f944f2308c1c8c34bb4aa088d87153d7a4598
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Sun Feb 9 12:55:01 2025 +0800

    [INLONG-11734][SDK] Optimize SDK stop processing flow (#11735)
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../inlong/sdk/dataproxy/BaseMsgSenderFactory.java |  5 +-
 .../sdk/dataproxy/common/ProxyClientConfig.java    | 15 +++--
 .../inlong/sdk/dataproxy/metric/MetricConfig.java  | 19 +++---
 .../sdk/dataproxy/metric/MetricDataHolder.java     | 11 +++-
 .../sdk/dataproxy/network/http/HttpClientMgr.java  | 24 ++++---
 .../sdk/dataproxy/network/tcp/TcpClientMgr.java    | 74 +++++++++++++++++++---
 .../inlong/sdk/dataproxy/sender/BaseSender.java    |  5 +-
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     | 33 ++++++++++
 8 files changed, 147 insertions(+), 39 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
index 72c1a6ac67..623a318188 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -67,6 +67,7 @@ public class BaseMsgSenderFactory {
 
     public void close() {
         int totalSenderCnt;
+        long startTime = System.currentTimeMillis();
         logger.info("MsgSenderFactory({}) is closing", this.factoryNo);
         senderCacheLock.writeLock().lock();
         try {
@@ -77,8 +78,8 @@ public class BaseMsgSenderFactory {
         } finally {
             senderCacheLock.writeLock().unlock();
         }
-        logger.info("MsgSenderFactory({}) closed, release {} inlong senders",
-                this.factoryNo, totalSenderCnt);
+        logger.info("MsgSenderFactory({}) closed, release {} inlong senders, 
cost {} ms",
+                this.factoryNo, totalSenderCnt, System.currentTimeMillis() - 
startTime);
     }
 
     public void removeClient(BaseSender msgSender) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
index 274b24f16d..acb6db81ba 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
@@ -379,11 +379,16 @@ public class ProxyClientConfig implements Cloneable {
         return metricConfig.isEnableMetric();
     }
 
-    public void setMetricConfig(MetricConfig metricConfig) {
-        if (metricConfig == null) {
-            throw new IllegalArgumentException("metricConfig is null");
-        }
-        this.metricConfig = metricConfig;
+    public void setEnableMetric(boolean enableMetric) {
+        this.metricConfig.setEnableMetric(enableMetric);
+    }
+
+    public void setMetricOutIntvlInfo(long metricOutIntvlMs, long 
metricOutWarnIntMs) {
+        this.metricConfig.setMetricOutIntvlInfo(metricOutIntvlMs, 
metricOutWarnIntMs);
+    }
+
+    public void setMetricKeyMaskInfos(boolean maskGroupId, boolean 
maskStreamId) {
+        this.metricConfig.setMetricKeyMaskInfos(maskGroupId, maskStreamId);
     }
 
     public MetricConfig getMetricConfig() {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java
index 6c4f7e2dae..e689781379 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java
@@ -69,9 +69,16 @@ public class MetricConfig {
         return maskStreamId;
     }
 
-    public void setMetricOutIntvlMs(long metricOutIntvlMs) {
-        if (metricOutIntvlMs >= MIN_METRIC_OUTPUT_INTVL_MS) {
-            this.metricOutIntvlMs = metricOutIntvlMs;
+    public void setMetricOutIntvlInfo(Long metricOutIntvlMs, Long 
metricOutWarnIntMs) {
+        if (metricOutIntvlMs != null) {
+            if (metricOutIntvlMs >= MIN_METRIC_OUTPUT_INTVL_MS) {
+                this.metricOutIntvlMs = metricOutIntvlMs;
+            }
+        }
+        if (metricOutWarnIntMs != null) {
+            if (metricOutWarnIntMs >= MIN_METRIC_OUTPUT_INTVL_MS) {
+                this.metricOutWarnIntMs = metricOutWarnIntMs;
+            }
         }
     }
 
@@ -83,12 +90,6 @@ public class MetricConfig {
         return metricOutWarnIntMs;
     }
 
-    public void setMetricOutWarnIntMs(long metricOutWarnIntMs) {
-        if (metricOutWarnIntMs >= MIN_METRIC_OUTPUT_INTVL_MS) {
-            this.metricOutWarnIntMs = metricOutWarnIntMs;
-        }
-    }
-
     public long getDateFormatIntvlMs() {
         return dateFormatIntvlMs;
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
index 222b8099b8..03e260789a 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
@@ -85,11 +85,11 @@ public class MetricDataHolder implements Runnable {
     public void close() {
         logger.info("Metric DataHolder({}) closing ......", 
this.sender.getSenderId());
         // process rest data
-        this.outputExecutor.shutdown();
         long startTime = System.currentTimeMillis();
+        this.started = false;
+        this.outputExecutor.shutdown();
         outputMetricData(startTime, getOldIndex());
         outputMetricData(startTime, getCurIndex());
-        this.started = false;
         logger.info("Metric DataHolder({}) closed, cost = {} ms!",
                 this.sender.getSenderId(), System.currentTimeMillis() - 
startTime);
     }
@@ -181,7 +181,12 @@ public class MetricDataHolder implements Runnable {
             } catch (InterruptedException e) {
                 break;
             }
-        } while (selectedUnit.refCnt.get() > 0);
+        } while (started && selectedUnit.refCnt.get() > 0);
+        if (!started) {
+            logger.info("Metric DataHolder({}) closed, stop output metric 
info",
+                    sender.getSenderId());
+            return;
+        }
         StringBuilder strBuff = new StringBuilder(512);
         String rptContent = buildMetricReportInfo(strBuff, reportTime, 
selectedUnit);
         logger.info("Metric DataHolder({}) output metricInfo={}",
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
index 72758a056e..bc4d2884c3 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
@@ -123,16 +123,22 @@ public class HttpClientMgr implements ClientMgr {
             return;
         }
         int remainCnt = 0;
+        long stopTime = System.currentTimeMillis();
+        logger.info("ClientMgr({}) is closing...", this.sender.getSenderId());
         if (!messageCache.isEmpty()) {
-            long startTime = System.currentTimeMillis();
-            while (!messageCache.isEmpty()) {
-                if (System.currentTimeMillis() - startTime >= 
httpConfig.getHttpCloseWaitPeriodMs()) {
-                    break;
+            if (httpConfig.isDiscardHttpCacheWhenClosing()) {
+                messageCache.clear();
+            } else {
+                long startTime = System.currentTimeMillis();
+                while (!messageCache.isEmpty()) {
+                    if (System.currentTimeMillis() - startTime >= 
httpConfig.getHttpCloseWaitPeriodMs()) {
+                        break;
+                    }
+                    ProxyUtils.sleepSomeTime(100L);
                 }
-                ProxyUtils.sleepSomeTime(100L);
+                remainCnt = messageCache.size();
+                messageCache.clear();
             }
-            remainCnt = messageCache.size();
-            messageCache.clear();
         }
         workerServices.shutdown();
         if (httpClient != null) {
@@ -142,8 +148,8 @@ public class HttpClientMgr implements ClientMgr {
                 //
             }
         }
-        logger.info("ClientMgr({}) stopped, remain ({}) messages discarded!",
-                this.sender.getSenderId(), remainCnt);
+        logger.info("ClientMgr({}) stopped, remain ({}) messages discarded, 
cost {} ms!",
+                this.sender.getSenderId(), remainCnt, 
System.currentTimeMillis() - stopTime);
     }
 
     @Override
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
index 5b4bf601c0..ec048f54ab 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
@@ -133,19 +133,14 @@ public class TcpClientMgr implements ClientMgr {
             timerObj.stop();
         }
         this.bootstrap.config().group().shutdownGracefully();
-
         this.maintThread.shutDown();
-        if (!channelMsgIdMap.isEmpty()) {
-            long startTime = System.currentTimeMillis();
-            while (!channelMsgIdMap.isEmpty()) {
-                if (System.currentTimeMillis() - startTime >= 
tcpConfig.getConCloseWaitPeriodMs()) {
-                    break;
-                }
-                ProxyUtils.sleepSomeTime(100L);
-            }
+        long startTime = System.currentTimeMillis();
+        if (!this.reqTimeouts.isEmpty()) {
+            notifyInflightMsgClosed();
         }
         this.activeNodes.clear();
-        logger.info("ClientMgr({}) stopped!", senderId);
+        logger.info("ClientMgr({}) stopped, release cost {} ms!",
+                senderId, System.currentTimeMillis() - startTime);
     }
 
     @Override
@@ -552,6 +547,65 @@ public class TcpClientMgr implements ClientMgr {
         return tmpBootstrap;
     }
 
+    public void notifyInflightMsgClosed() {
+        long curTime;
+        Timeout timeoutTask;
+        TcpNettyClient nettyTcpClient;
+        for (Integer messageId : this.reqTimeouts.keySet()) {
+            if (messageId == null) {
+                continue;
+            }
+            timeoutTask = this.reqTimeouts.remove(messageId);
+            if (timeoutTask != null) {
+                timeoutTask.cancel();
+            }
+            TcpCallFuture callFuture = this.reqObjects.remove(messageId);
+            if (callFuture == null) {
+                continue;
+            }
+            curTime = System.currentTimeMillis();
+            // find and process in using clients
+            nettyTcpClient = usingClientMaps.get(callFuture.getClientAddr());
+            if (nettyTcpClient != null
+                    && nettyTcpClient.getChanTermId() == 
callFuture.getChanTerm()) {
+                try {
+                    nettyTcpClient.getChannel().eventLoop().execute(
+                            () -> callFuture.onMessageAck(new 
ProcessResult(ErrorCode.SDK_CLOSED)));
+                } catch (Throwable ex) {
+                    if (callbackExceptCnt.shouldPrint()) {
+                        logger.info("ClientMgr({}) closed, callback 
exception!",
+                                senderId, ex);
+                    }
+                } finally {
+                    nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+                    
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(),
+                            callFuture.getGroupId(), callFuture.getStreamId(), 
callFuture.getMsgCnt(),
+                            (System.currentTimeMillis() - curTime));
+                }
+                return;
+            }
+            // find and process in deleting clients
+            nettyTcpClient = 
deletingClientMaps.get(callFuture.getClientAddr());
+            if (nettyTcpClient != null
+                    && nettyTcpClient.getChanTermId() == 
callFuture.getChanTerm()) {
+                try {
+                    nettyTcpClient.getChannel().eventLoop().execute(
+                            () -> callFuture.onMessageAck(new 
ProcessResult(ErrorCode.SDK_CLOSED)));
+                } catch (Throwable ex) {
+                    if (callbackExceptCnt.shouldPrint()) {
+                        logger.info("ClientMgr({}) closed, callback2 
exception!",
+                                senderId, ex);
+                    }
+                } finally {
+                    nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+                    
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(),
+                            callFuture.getGroupId(), callFuture.getStreamId(), 
callFuture.getMsgCnt(),
+                            (System.currentTimeMillis() - curTime));
+                }
+            }
+        }
+    }
+
     private class MaintThread extends Thread {
 
         private volatile boolean bShutDown;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
index 8399da4dac..3720a98ecb 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
@@ -133,13 +133,16 @@ public abstract class BaseSender implements ConfigHolder {
         if (!senderStatus.compareAndSet(currentStatus, SENDER_STATUS_CLOSED)) {
             return;
         }
+        long startTime = System.currentTimeMillis();
+        logger.info("Sender({}) instance is stopping...", senderId);
         configManager.shutDown();
         clientMgr.stop();
         metricHolder.close();
         if (this.senderFactory != null) {
             this.senderFactory.removeClient(this);
         }
-        logger.info("Sender({}) instance stopped!", senderId);
+        logger.info("Sender({}) instance stopped, cost {} ms!",
+                senderId, System.currentTimeMillis() - startTime);
     }
 
     @Override
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index 7d71133330..cdcc5bc6c8 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -31,6 +31,7 @@ import java.lang.management.ManagementFactory;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -150,6 +151,38 @@ public class ProxyUtils {
         return protocol + ":" + regionName + ":" + groupId;
     }
 
+    /**
+     * get valid attrs, remove invalid attrs
+     * @param attrsMap the input attrs
+     * @return valid attrs
+     */
+    public static Map<String, String> getValidAttrs(Map<String, String> 
attrsMap) {
+        if (attrsMap == null || attrsMap.isEmpty()) {
+            return attrsMap;
+        }
+        String tmpValue;
+        Map<String, String> validAttrsMap = new HashMap<>();
+        for (Map.Entry<String, String> entry : attrsMap.entrySet()) {
+            if (StringUtils.isBlank(entry.getKey())
+                    || entry.getKey().contains(AttributeConstants.SEPARATOR)
+                    || 
entry.getKey().contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
+                continue;
+            }
+            tmpValue = entry.getKey().trim();
+            if (ProxyUtils.SdkReservedWords.contains(tmpValue)) {
+                continue;
+            }
+            if (entry.getValue() != null) {
+                if (entry.getValue().contains(AttributeConstants.SEPARATOR)
+                        || 
entry.getValue().contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
+                    continue;
+                }
+            }
+            validAttrsMap.put(tmpValue, entry.getValue());
+        }
+        return validAttrsMap;
+    }
+
     public static boolean isAttrKeysValid(Map<String, String> attrsMap) {
         if (attrsMap == null || attrsMap.size() == 0) {
             return false;

Reply via email to