This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new b37f944f23 [INLONG-11734][SDK] Optimize SDK stop processing flow (#11735) b37f944f23 is described below commit b37f944f2308c1c8c34bb4aa088d87153d7a4598 Author: Goson Zhang <4675...@qq.com> AuthorDate: Sun Feb 9 12:55:01 2025 +0800 [INLONG-11734][SDK] Optimize SDK stop processing flow (#11735) Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../inlong/sdk/dataproxy/BaseMsgSenderFactory.java | 5 +- .../sdk/dataproxy/common/ProxyClientConfig.java | 15 +++-- .../inlong/sdk/dataproxy/metric/MetricConfig.java | 19 +++--- .../sdk/dataproxy/metric/MetricDataHolder.java | 11 +++- .../sdk/dataproxy/network/http/HttpClientMgr.java | 24 ++++--- .../sdk/dataproxy/network/tcp/TcpClientMgr.java | 74 +++++++++++++++++++--- .../inlong/sdk/dataproxy/sender/BaseSender.java | 5 +- .../inlong/sdk/dataproxy/utils/ProxyUtils.java | 33 ++++++++++ 8 files changed, 147 insertions(+), 39 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java index 72c1a6ac67..623a318188 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java @@ -67,6 +67,7 @@ public class BaseMsgSenderFactory { public void close() { int totalSenderCnt; + long startTime = System.currentTimeMillis(); logger.info("MsgSenderFactory({}) is closing", this.factoryNo); senderCacheLock.writeLock().lock(); try { @@ -77,8 +78,8 @@ public class BaseMsgSenderFactory { } finally { senderCacheLock.writeLock().unlock(); } - logger.info("MsgSenderFactory({}) closed, release {} inlong senders", - this.factoryNo, totalSenderCnt); + logger.info("MsgSenderFactory({}) closed, release {} inlong senders, cost {} ms", + this.factoryNo, totalSenderCnt, System.currentTimeMillis() - startTime); } public void removeClient(BaseSender msgSender) { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java index 274b24f16d..acb6db81ba 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java @@ -379,11 +379,16 @@ public class ProxyClientConfig implements Cloneable { return metricConfig.isEnableMetric(); } - public void setMetricConfig(MetricConfig metricConfig) { - if (metricConfig == null) { - throw new IllegalArgumentException("metricConfig is null"); - } - this.metricConfig = metricConfig; + public void setEnableMetric(boolean enableMetric) { + this.metricConfig.setEnableMetric(enableMetric); + } + + public void setMetricOutIntvlInfo(long metricOutIntvlMs, long metricOutWarnIntMs) { + this.metricConfig.setMetricOutIntvlInfo(metricOutIntvlMs, metricOutWarnIntMs); + } + + public void setMetricKeyMaskInfos(boolean maskGroupId, boolean maskStreamId) { + this.metricConfig.setMetricKeyMaskInfos(maskGroupId, maskStreamId); } public MetricConfig getMetricConfig() { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java index 6c4f7e2dae..e689781379 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricConfig.java @@ -69,9 +69,16 @@ public class MetricConfig { return maskStreamId; } - public void setMetricOutIntvlMs(long metricOutIntvlMs) { - if (metricOutIntvlMs >= MIN_METRIC_OUTPUT_INTVL_MS) { - this.metricOutIntvlMs = metricOutIntvlMs; + public void setMetricOutIntvlInfo(Long metricOutIntvlMs, Long metricOutWarnIntMs) { + if (metricOutIntvlMs != null) { + if (metricOutIntvlMs >= MIN_METRIC_OUTPUT_INTVL_MS) { + this.metricOutIntvlMs = metricOutIntvlMs; + } + } + if (metricOutWarnIntMs != null) { + if (metricOutWarnIntMs >= MIN_METRIC_OUTPUT_INTVL_MS) { + this.metricOutWarnIntMs = metricOutWarnIntMs; + } } } @@ -83,12 +90,6 @@ public class MetricConfig { return metricOutWarnIntMs; } - public void setMetricOutWarnIntMs(long metricOutWarnIntMs) { - if (metricOutWarnIntMs >= MIN_METRIC_OUTPUT_INTVL_MS) { - this.metricOutWarnIntMs = metricOutWarnIntMs; - } - } - public long getDateFormatIntvlMs() { return dateFormatIntvlMs; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java index 222b8099b8..03e260789a 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java @@ -85,11 +85,11 @@ public class MetricDataHolder implements Runnable { public void close() { logger.info("Metric DataHolder({}) closing ......", this.sender.getSenderId()); // process rest data - this.outputExecutor.shutdown(); long startTime = System.currentTimeMillis(); + this.started = false; + this.outputExecutor.shutdown(); outputMetricData(startTime, getOldIndex()); outputMetricData(startTime, getCurIndex()); - this.started = false; logger.info("Metric DataHolder({}) closed, cost = {} ms!", this.sender.getSenderId(), System.currentTimeMillis() - startTime); } @@ -181,7 +181,12 @@ public class MetricDataHolder implements Runnable { } catch (InterruptedException e) { break; } - } while (selectedUnit.refCnt.get() > 0); + } while (started && selectedUnit.refCnt.get() > 0); + if (!started) { + logger.info("Metric DataHolder({}) closed, stop output metric info", + sender.getSenderId()); + return; + } StringBuilder strBuff = new StringBuilder(512); String rptContent = buildMetricReportInfo(strBuff, reportTime, selectedUnit); logger.info("Metric DataHolder({}) output metricInfo={}", diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java index 72758a056e..bc4d2884c3 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java @@ -123,16 +123,22 @@ public class HttpClientMgr implements ClientMgr { return; } int remainCnt = 0; + long stopTime = System.currentTimeMillis(); + logger.info("ClientMgr({}) is closing...", this.sender.getSenderId()); if (!messageCache.isEmpty()) { - long startTime = System.currentTimeMillis(); - while (!messageCache.isEmpty()) { - if (System.currentTimeMillis() - startTime >= httpConfig.getHttpCloseWaitPeriodMs()) { - break; + if (httpConfig.isDiscardHttpCacheWhenClosing()) { + messageCache.clear(); + } else { + long startTime = System.currentTimeMillis(); + while (!messageCache.isEmpty()) { + if (System.currentTimeMillis() - startTime >= httpConfig.getHttpCloseWaitPeriodMs()) { + break; + } + ProxyUtils.sleepSomeTime(100L); } - ProxyUtils.sleepSomeTime(100L); + remainCnt = messageCache.size(); + messageCache.clear(); } - remainCnt = messageCache.size(); - messageCache.clear(); } workerServices.shutdown(); if (httpClient != null) { @@ -142,8 +148,8 @@ public class HttpClientMgr implements ClientMgr { // } } - logger.info("ClientMgr({}) stopped, remain ({}) messages discarded!", - this.sender.getSenderId(), remainCnt); + logger.info("ClientMgr({}) stopped, remain ({}) messages discarded, cost {} ms!", + this.sender.getSenderId(), remainCnt, System.currentTimeMillis() - stopTime); } @Override diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java index 5b4bf601c0..ec048f54ab 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java @@ -133,19 +133,14 @@ public class TcpClientMgr implements ClientMgr { timerObj.stop(); } this.bootstrap.config().group().shutdownGracefully(); - this.maintThread.shutDown(); - if (!channelMsgIdMap.isEmpty()) { - long startTime = System.currentTimeMillis(); - while (!channelMsgIdMap.isEmpty()) { - if (System.currentTimeMillis() - startTime >= tcpConfig.getConCloseWaitPeriodMs()) { - break; - } - ProxyUtils.sleepSomeTime(100L); - } + long startTime = System.currentTimeMillis(); + if (!this.reqTimeouts.isEmpty()) { + notifyInflightMsgClosed(); } this.activeNodes.clear(); - logger.info("ClientMgr({}) stopped!", senderId); + logger.info("ClientMgr({}) stopped, release cost {} ms!", + senderId, System.currentTimeMillis() - startTime); } @Override @@ -552,6 +547,65 @@ public class TcpClientMgr implements ClientMgr { return tmpBootstrap; } + public void notifyInflightMsgClosed() { + long curTime; + Timeout timeoutTask; + TcpNettyClient nettyTcpClient; + for (Integer messageId : this.reqTimeouts.keySet()) { + if (messageId == null) { + continue; + } + timeoutTask = this.reqTimeouts.remove(messageId); + if (timeoutTask != null) { + timeoutTask.cancel(); + } + TcpCallFuture callFuture = this.reqObjects.remove(messageId); + if (callFuture == null) { + continue; + } + curTime = System.currentTimeMillis(); + // find and process in using clients + nettyTcpClient = usingClientMaps.get(callFuture.getClientAddr()); + if (nettyTcpClient != null + && nettyTcpClient.getChanTermId() == callFuture.getChanTerm()) { + try { + nettyTcpClient.getChannel().eventLoop().execute( + () -> callFuture.onMessageAck(new ProcessResult(ErrorCode.SDK_CLOSED))); + } catch (Throwable ex) { + if (callbackExceptCnt.shouldPrint()) { + logger.info("ClientMgr({}) closed, callback exception!", + senderId, ex); + } + } finally { + nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm()); + baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(), + callFuture.getGroupId(), callFuture.getStreamId(), callFuture.getMsgCnt(), + (System.currentTimeMillis() - curTime)); + } + return; + } + // find and process in deleting clients + nettyTcpClient = deletingClientMaps.get(callFuture.getClientAddr()); + if (nettyTcpClient != null + && nettyTcpClient.getChanTermId() == callFuture.getChanTerm()) { + try { + nettyTcpClient.getChannel().eventLoop().execute( + () -> callFuture.onMessageAck(new ProcessResult(ErrorCode.SDK_CLOSED))); + } catch (Throwable ex) { + if (callbackExceptCnt.shouldPrint()) { + logger.info("ClientMgr({}) closed, callback2 exception!", + senderId, ex); + } + } finally { + nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm()); + baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(), + callFuture.getGroupId(), callFuture.getStreamId(), callFuture.getMsgCnt(), + (System.currentTimeMillis() - curTime)); + } + } + } + } + private class MaintThread extends Thread { private volatile boolean bShutDown; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java index 8399da4dac..3720a98ecb 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java @@ -133,13 +133,16 @@ public abstract class BaseSender implements ConfigHolder { if (!senderStatus.compareAndSet(currentStatus, SENDER_STATUS_CLOSED)) { return; } + long startTime = System.currentTimeMillis(); + logger.info("Sender({}) instance is stopping...", senderId); configManager.shutDown(); clientMgr.stop(); metricHolder.close(); if (this.senderFactory != null) { this.senderFactory.removeClient(this); } - logger.info("Sender({}) instance stopped!", senderId); + logger.info("Sender({}) instance stopped, cost {} ms!", + senderId, System.currentTimeMillis() - startTime); } @Override diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java index 7d71133330..cdcc5bc6c8 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java @@ -31,6 +31,7 @@ import java.lang.management.ManagementFactory; import java.net.DatagramSocket; import java.net.InetAddress; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -150,6 +151,38 @@ public class ProxyUtils { return protocol + ":" + regionName + ":" + groupId; } + /** + * get valid attrs, remove invalid attrs + * @param attrsMap the input attrs + * @return valid attrs + */ + public static Map<String, String> getValidAttrs(Map<String, String> attrsMap) { + if (attrsMap == null || attrsMap.isEmpty()) { + return attrsMap; + } + String tmpValue; + Map<String, String> validAttrsMap = new HashMap<>(); + for (Map.Entry<String, String> entry : attrsMap.entrySet()) { + if (StringUtils.isBlank(entry.getKey()) + || entry.getKey().contains(AttributeConstants.SEPARATOR) + || entry.getKey().contains(AttributeConstants.KEY_VALUE_SEPARATOR)) { + continue; + } + tmpValue = entry.getKey().trim(); + if (ProxyUtils.SdkReservedWords.contains(tmpValue)) { + continue; + } + if (entry.getValue() != null) { + if (entry.getValue().contains(AttributeConstants.SEPARATOR) + || entry.getValue().contains(AttributeConstants.KEY_VALUE_SEPARATOR)) { + continue; + } + } + validAttrsMap.put(tmpValue, entry.getValue()); + } + return validAttrsMap; + } + public static boolean isAttrKeysValid(Map<String, String> attrsMap) { if (attrsMap == null || attrsMap.size() == 0) { return false;