This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f0a79bcb36 [INLONG-11629][SDK] Adjust the Sender initialization logic (#11630) f0a79bcb36 is described below commit f0a79bcb360675a0c290740ed89d528304c3e78f Author: Goson Zhang <4675...@qq.com> AuthorDate: Thu Jan 2 15:33:48 2025 +0800 [INLONG-11629][SDK] Adjust the Sender initialization logic (#11630) Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../inlong/sdk/dataproxy/network/Sender.java | 32 +++++++++++----------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java index b56781c0ce..eb320b4a2a 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java @@ -82,15 +82,28 @@ public class Sender { this.asyncCallbackMaxSize = configure.getTotalAsyncCallbackSize(); this.threadPool = Executors.newCachedThreadPool(); this.clientMgr = new ClientMgr(configure, this, selfDefineFactory); + this.scanThread = new TimeoutScanThread(this, configure); + if (configure.isEnableMetric()) { + metricWorker = new MetricWorkerThread(configure, this); + } + logger.info("Sender({}) instance initialized!", this.instanceId); + } + + public void start() throws Exception { + if (!started.compareAndSet(false, true)) { + return; + } + this.clientMgr.start(); + this.scanThread.start(); ProxyConfigEntry proxyConfigEntry; try { proxyConfigEntry = this.clientMgr.getGroupIdConfigure(); setClusterId(proxyConfigEntry.getClusterId()); - } catch (Throwable e) { + } catch (Throwable ex) { if (configure.isOnlyUseLocalProxyConfig()) { - throw new Exception("Get local proxy configure failure!", e.getCause()); + throw new Exception("Get local proxy configure failure!", ex); } else { - throw new Exception("Visit manager error!", e.getCause()); + throw new Exception("Visit manager error!", ex); } } if (!proxyConfigEntry.isInterVisit()) { @@ -101,19 +114,6 @@ public class Sender { throw new Exception("In OutNetwork isNeedDataEncry must be true!"); } } - scanThread = new TimeoutScanThread(this, configure); - if (configure.isEnableMetric()) { - metricWorker = new MetricWorkerThread(configure, this); - } - logger.info("Sender({}) instance initialized!", this.instanceId); - } - - public void start() { - if (!started.compareAndSet(false, true)) { - return; - } - this.clientMgr.start(); - this.scanThread.start(); if (this.configure.isEnableMetric()) { this.metricWorker.start(); }