This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d303033ad0 [INLONG-10597][Sort] Provide default pulsar producer 
configuration (#10600)
d303033ad0 is described below

commit d303033ad00f4b480c42be8bfb9bc96128bcfbac
Author: vernedeng <verned...@apache.org>
AuthorDate: Wed Jul 10 19:22:07 2024 +0800

    [INLONG-10597][Sort] Provide default pulsar producer configuration (#10600)
    
    * [INLONG-10597][Sort] Provide default pulsar producer configuration
    
    ---------
    
    Co-authored-by: vernedeng <verned...@apache.rog>
---
 .../sink/pulsar/PulsarFederationSinkContext.java   | 10 ++-
 .../sink/pulsar/PulsarProducerCluster.java         | 80 ++++++++++++++--------
 .../sink/pulsar/PulsarProducerFederation.java      | 31 ++++++++-
 3 files changed, 89 insertions(+), 32 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index cce771675f..a686d50214 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -76,14 +76,14 @@ public class PulsarFederationSinkContext extends 
SinkContext {
                 this.pulsarNodeConfig = requestNodeConfig;
             }
 
+            this.taskConfig = newTaskConfig;
+            this.sortTaskConfig = newSortTaskConfig;
+
             CacheClusterConfig clusterConfig = new CacheClusterConfig();
             clusterConfig.setClusterName(this.taskName);
             clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
             this.cacheClusterConfig = clusterConfig;
 
-            this.taskConfig = newTaskConfig;
-            this.sortTaskConfig = newSortTaskConfig;
-
             Map<String, PulsarIdConfig> fromTaskConfig = 
fromTaskConfig(taskConfig);
             Map<String, PulsarIdConfig> fromSortTaskConfig = 
fromSortTaskConfig(sortTaskConfig);
             SortConfigMetricReporter.reportClusterDiff(clusterId, taskName, 
fromTaskConfig, fromSortTaskConfig);
@@ -139,6 +139,10 @@ public class PulsarFederationSinkContext extends 
SinkContext {
         return pulsarNodeConfig;
     }
 
+    public CacheClusterConfig getCacheClusterConfig() {
+        return cacheClusterConfig;
+    }
+
     public void addSendMetric(ProfileEvent currentRecord, String topic) {
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index 1ae34191ec..4f2f80e0ab 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -19,10 +19,11 @@ package org.apache.inlong.sort.standalone.sink.pulsar;
 
 import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
-import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Transaction;
@@ -41,6 +42,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
@@ -75,26 +77,31 @@ public class PulsarProducerCluster implements 
LifecycleAware {
     private final String workerName;
     private final PulsarFederationSinkContext sinkContext;
     private final PulsarNodeConfig nodeConfig;
-    private final Context context;
-    private final String cacheClusterName;
+    private final CacheClusterConfig cacheClusterConfig;
+    private String cacheClusterName;
+    private Context context;
     private LifecycleState state;
     private IEvent2PulsarRecordHandler handler;
 
     /**
      * pulsar client
      */
+    private ClientBuilder clientBuilder;
     private PulsarClient client;
     private ProducerBuilder<byte[]> baseBuilder;
 
     private Map<String, Producer<byte[]>> producerMap = new 
ConcurrentHashMap<>();
 
-    public PulsarProducerCluster(String workerName, PulsarNodeConfig 
nodeConfig, PulsarFederationSinkContext context) {
+    public PulsarProducerCluster(
+            String workerName,
+            CacheClusterConfig cacheClusterConfig,
+            PulsarNodeConfig nodeConfig,
+            PulsarFederationSinkContext context) {
         this.workerName = workerName;
         this.sinkContext = context;
         this.nodeConfig = nodeConfig;
-        this.context = new Context(nodeConfig.getProperties() != null ? 
nodeConfig.getProperties() : Maps.newHashMap());
+        this.cacheClusterConfig = cacheClusterConfig;
         this.state = LifecycleState.IDLE;
-        this.cacheClusterName = nodeConfig.getNodeName();
         this.handler = sinkContext.createEventHandler();
     }
 
@@ -106,16 +113,10 @@ public class PulsarProducerCluster implements 
LifecycleAware {
         this.state = LifecycleState.START;
         try {
             // create pulsar client
-            ClientBuilder clientBuilder = PulsarClient.builder();
-            String serviceUrl = nodeConfig.getServiceUrl();
-            if (StringUtils.isBlank(serviceUrl)) {
-                throw new IllegalArgumentException("service url should not be 
null");
-            }
-
-            clientBuilder.serviceUrl(serviceUrl);
-            String authentication = nodeConfig.getToken();
-            if (StringUtils.isNoneBlank(authentication)) {
-                
clientBuilder.authentication(AuthenticationFactory.token(authentication));
+            if (CommonPropertiesHolder.useUnifiedConfiguration()) {
+                initBuilderByNodeConfig(nodeConfig);
+            } else {
+                initBuilderByCacheCluster(cacheClusterConfig);
             }
 
             this.client = clientBuilder
@@ -135,7 +136,7 @@ public class PulsarProducerCluster implements 
LifecycleAware {
                     .maxPendingMessagesAcrossPartitions(
                             
context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 50000))
                     .sendTimeout(context.getInteger(KEY_SENDTIMEOUT, 0), 
TimeUnit.MILLISECONDS)
-                    .compressionType(this.getPulsarCompressionType())
+                    
.compressionType(this.getPulsarCompressionType(context.getString(KEY_COMPRESSIONTYPE,
 "ZLIB")))
                     .blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, 
true))
                     .roundRobinRouterBatchingPartitionSwitchFrequency(
                             
context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 10))
@@ -145,13 +146,45 @@ public class PulsarProducerCluster implements 
LifecycleAware {
         }
     }
 
+    private void initBuilderByCacheCluster(CacheClusterConfig 
cacheClusterConfig) {
+        this.cacheClusterName = cacheClusterConfig.getClusterName();
+        this.context = new Context(cacheClusterConfig.getParams());
+        clientBuilder = PulsarClient.builder();
+        String serviceUrl = 
cacheClusterConfig.getParams().get(KEY_SERVICE_URL);
+        if (StringUtils.isBlank(serviceUrl)) {
+            throw new IllegalArgumentException("service url should not be 
null");
+        }
+
+        clientBuilder.serviceUrl(serviceUrl);
+        String authentication = 
cacheClusterConfig.getParams().get(KEY_AUTHENTICATION);
+        if (StringUtils.isNoneBlank(authentication)) {
+            
clientBuilder.authentication(AuthenticationFactory.token(authentication));
+        }
+    }
+
+    private void initBuilderByNodeConfig(PulsarNodeConfig nodeConfig) {
+        this.cacheClusterName = nodeConfig.getNodeName();
+        this.context = new Context(nodeConfig.getProperties() == null ? new 
HashMap<>() : nodeConfig.getProperties());
+
+        clientBuilder = PulsarClient.builder();
+        String serviceUrl = nodeConfig.getServiceUrl();
+        if (StringUtils.isBlank(serviceUrl)) {
+            throw new IllegalArgumentException("service url should not be 
null");
+        }
+
+        clientBuilder.serviceUrl(serviceUrl);
+        String authentication = nodeConfig.getToken();
+        if (StringUtils.isNoneBlank(authentication)) {
+            
clientBuilder.authentication(AuthenticationFactory.token(authentication));
+        }
+    }
+
     /**
      * getPulsarCompressionType
      *
      * @return CompressionType
      */
-    private CompressionType getPulsarCompressionType() {
-        String type = nodeConfig.getCompressionType();
+    private CompressionType getPulsarCompressionType(String type) {
         if (type == null) {
             return CompressionType.ZLIB;
         }
@@ -275,13 +308,4 @@ public class PulsarProducerCluster implements 
LifecycleAware {
         return true;
     }
 
-    /**
-     * get cacheClusterName
-     *
-     * @return the cacheClusterName
-     */
-    public String getCacheClusterName() {
-        return cacheClusterName;
-    }
-
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
index fd4a0b95b5..d12fddb123 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sort.standalone.sink.pulsar;
 
 import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import org.apache.flume.Transaction;
@@ -41,6 +43,7 @@ public class PulsarProducerFederation {
     private final PulsarFederationSinkContext context;
     private Timer reloadTimer;
     private PulsarNodeConfig nodeConfig;
+    private CacheClusterConfig cacheClusterConfig;
     private PulsarProducerCluster cluster;
     private PulsarProducerCluster deleteCluster;
 
@@ -108,12 +111,38 @@ public class PulsarProducerFederation {
             LOG.error("failed to close delete cluster, ex={}", e.getMessage(), 
e);
         }
 
+        if (CommonPropertiesHolder.useUnifiedConfiguration()) {
+            reloadByNodeConfig();
+        } else {
+            reloadByCacheClusterConfig();
+        }
+
+    }
+
+    private void reloadByNodeConfig() {
         try {
             if (nodeConfig != null && context.getNodeConfig().getVersion() <= 
nodeConfig.getVersion()) {
                 return;
             }
             this.nodeConfig = context.getNodeConfig();
-            PulsarProducerCluster updateCluster = new 
PulsarProducerCluster(workerName, nodeConfig, context);
+            PulsarProducerCluster updateCluster =
+                    new PulsarProducerCluster(workerName, cacheClusterConfig, 
nodeConfig, context);
+            updateCluster.start();
+            this.deleteCluster = cluster;
+            this.cluster = updateCluster;
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    private void reloadByCacheClusterConfig() {
+        try {
+            if (cacheClusterConfig != null && 
!cacheClusterConfig.equals(context.getCacheClusterConfig())) {
+                return;
+            }
+            this.cacheClusterConfig = context.getCacheClusterConfig();
+            PulsarProducerCluster updateCluster =
+                    new PulsarProducerCluster(workerName, cacheClusterConfig, 
nodeConfig, context);
             updateCluster.start();
             this.deleteCluster = cluster;
             this.cluster = updateCluster;

Reply via email to