This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 00035ee2e8 [INLONG-10594][Sort] Provide default kafka producer 
configuration (#10595)
00035ee2e8 is described below

commit 00035ee2e822a905aaa4768b1ef49b09b8fc192a
Author: vernedeng <verned...@apache.org>
AuthorDate: Wed Jul 10 16:36:20 2024 +0800

    [INLONG-10594][Sort] Provide default kafka producer configuration (#10595)
    
    * [INLONG-10594][Sort] Provide default kafka producer configuration
    ---------
    
    Co-authored-by: vernedeng <verned...@apache.rog>
---
 .../standalone/config/pojo/CacheClusterConfig.java |  39 +-------
 .../sink/kafka/KafkaFederationSinkContext.java     |  11 +++
 .../sink/kafka/KafkaProducerCluster.java           | 106 +++++++++++++++------
 .../sink/kafka/KafkaProducerFederation.java        |  31 +++++-
 .../sink/pulsar/PulsarFederationSinkContext.java   |   8 ++
 5 files changed, 131 insertions(+), 64 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
index caee067831..83ae055e0d 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.standalone.config.pojo;
 
+import lombok.Data;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -24,45 +26,10 @@ import java.util.Map;
  * 
  * CacheClusterConfig
  */
+@Data
 public class CacheClusterConfig {
 
     private String clusterName;
     private Map<String, String> params = new HashMap<>();
 
-    /**
-     * get clusterName
-     * 
-     * @return the clusterName
-     */
-    public String getClusterName() {
-        return clusterName;
-    }
-
-    /**
-     * set clusterName
-     * 
-     * @param clusterName the clusterName to set
-     */
-    public void setClusterName(String clusterName) {
-        this.clusterName = clusterName;
-    }
-
-    /**
-     * get params
-     * 
-     * @return the params
-     */
-    public Map<String, String> getParams() {
-        return params;
-    }
-
-    /**
-     * set params
-     * 
-     * @param params the params to set
-     */
-    public void setParams(Map<String, String> params) {
-        this.params = params;
-    }
-
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index 739b214b1e..f1dbdeb528 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -25,6 +25,7 @@ import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
@@ -52,6 +53,7 @@ public class KafkaFederationSinkContext extends SinkContext {
     public static final String KEY_EVENT_HANDLER = "eventHandler";
 
     private KafkaNodeConfig kafkaNodeConfig;
+    private CacheClusterConfig cacheClusterConfig;
     private Map<String, KafkaIdConfig> idConfigMap = new ConcurrentHashMap<>();
 
     public KafkaFederationSinkContext(String sinkName, Context context, 
Channel channel) {
@@ -82,6 +84,11 @@ public class KafkaFederationSinkContext extends SinkContext {
             this.taskConfig = newTaskConfig;
             this.sortTaskConfig = newSortTaskConfig;
 
+            CacheClusterConfig clusterConfig = new CacheClusterConfig();
+            clusterConfig.setClusterName(this.taskName);
+            clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+            this.cacheClusterConfig = clusterConfig;
+
             Map<String, KafkaIdConfig> fromTaskConfig = 
fromTaskConfig(taskConfig);
             Map<String, KafkaIdConfig> fromSortTaskConfig = 
fromSortTaskConfig(sortTaskConfig);
             SortConfigMetricReporter.reportClusterDiff(clusterId, taskName, 
fromTaskConfig, fromSortTaskConfig);
@@ -121,6 +128,10 @@ public class KafkaFederationSinkContext extends 
SinkContext {
         return kafkaNodeConfig;
     }
 
+    public CacheClusterConfig getCacheClusterConfig() {
+        return cacheClusterConfig;
+    }
+
     /**
      * get Topic by uid
      *
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index d0fda7c5aa..95a8d102ac 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -19,12 +19,12 @@ package org.apache.inlong.sort.standalone.sink.kafka;
 
 import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import org.apache.flume.Context;
 import org.apache.flume.Transaction;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Properties;
 
 /** wrapper of kafka producer */
@@ -45,10 +46,9 @@ public class KafkaProducerCluster implements LifecycleAware {
 
     private final String workerName;
     protected final KafkaNodeConfig nodeConfig;
+    protected final CacheClusterConfig cacheClusterConfig;
     private final KafkaFederationSinkContext sinkContext;
-    private final Context context;
 
-    private final String cacheClusterName;
     private LifecycleState state;
     private IEvent2KafkaRecordHandler handler;
 
@@ -56,36 +56,67 @@ public class KafkaProducerCluster implements LifecycleAware 
{
 
     public KafkaProducerCluster(
             String workerName,
+            CacheClusterConfig cacheClusterConfig,
             KafkaNodeConfig nodeConfig,
             KafkaFederationSinkContext kafkaFederationSinkContext) {
         this.workerName = Preconditions.checkNotNull(workerName);
         this.nodeConfig = nodeConfig;
+        this.cacheClusterConfig = cacheClusterConfig;
         this.sinkContext = 
Preconditions.checkNotNull(kafkaFederationSinkContext);
-        this.context = new Context(nodeConfig.getProperties() != null ? 
nodeConfig.getProperties() : Maps.newHashMap());
         this.state = LifecycleState.IDLE;
-        this.cacheClusterName = nodeConfig.getNodeName();
         this.handler = sinkContext.createEventHandler();
     }
 
     /** start and init kafka producer */
     @Override
     public void start() {
+        if (CommonPropertiesHolder.useUnifiedConfiguration()) {
+            startByNodeConfig();
+        } else {
+            startByCacheCluster();
+        }
+    }
+
+    private void startByCacheCluster() {
         this.state = LifecycleState.START;
+        if (cacheClusterConfig == null) {
+            LOG.error("start kafka producer cluster failed, cacheClusterConfig 
config is null");
+            return;
+        }
         try {
-            Properties props = new Properties();
-            props.putAll(context.getParameters());
-            props.put(
-                    ProducerConfig.PARTITIONER_CLASS_CONFIG,
-                    context.getString(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
PartitionerSelector.class.getName()));
-            props.put(
-                    ProducerConfig.ACKS_CONFIG,
-                    context.getString(ProducerConfig.ACKS_CONFIG, "all"));
-            props.put(
-                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                    nodeConfig.getBootstrapServers());
+            Properties props = defaultKafkaProperties();
+            props.putAll(cacheClusterConfig.getParams());
+            props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
PartitionerSelector.class.getName());
+            props.put(ProducerConfig.ACKS_CONFIG,
+                    
cacheClusterConfig.getParams().getOrDefault(ProducerConfig.ACKS_CONFIG, "all"));
+
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+                    
cacheClusterConfig.getParams().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+
             props.put(ProducerConfig.CLIENT_ID_CONFIG,
-                    nodeConfig.getClientId() + "-" + workerName);
-            LOG.info("init kafka client info: " + props);
+                    
cacheClusterConfig.getParams().get(ProducerConfig.CLIENT_ID_CONFIG) + "-" + 
workerName);
+            LOG.info("init kafka client by cache cluster info: " + props);
+            producer = new KafkaProducer<>(props, new StringSerializer(), new 
ByteArraySerializer());
+            Preconditions.checkNotNull(producer);
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    private void startByNodeConfig() {
+        this.state = LifecycleState.START;
+        if (nodeConfig == null) {
+            LOG.error("start kafka producer cluster failed, node config is 
null");
+            return;
+        }
+        try {
+            Properties props = defaultKafkaProperties();
+            props.putAll(nodeConfig.getProperties() == null ? new HashMap<>() 
: nodeConfig.getProperties());
+            props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
PartitionerSelector.class.getName());
+            props.put(ProducerConfig.ACKS_CONFIG, nodeConfig.getAcks());
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
nodeConfig.getBootstrapServers());
+            props.put(ProducerConfig.CLIENT_ID_CONFIG, 
nodeConfig.getClientId() + "-" + workerName);
+            LOG.info("init kafka client by node config info: " + props);
             producer = new KafkaProducer<>(props, new StringSerializer(), new 
ByteArraySerializer());
             Preconditions.checkNotNull(producer);
         } catch (Exception e) {
@@ -93,6 +124,35 @@ public class KafkaProducerCluster implements LifecycleAware 
{
         }
     }
 
+    public Properties defaultKafkaProperties() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "122880");
+        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "44740000");
+        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
+        props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "86400000");
+        props.put(ProducerConfig.LINGER_MS_CONFIG, "500");
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
+        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "8388608");
+        props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000");
+        props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768");
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
+        props.put(ProducerConfig.RETRIES_CONFIG, "100000");
+        props.put(ProducerConfig.SEND_BUFFER_CONFIG, "524288");
+        props.put("mute.partition.error.max.times", "20");
+        props.put("mute.partition.max.percentage", "20");
+        props.put("rpc.timeout.ms", "30000");
+        props.put("topic.expiry.ms", "86400000");
+        props.put("unmute.partition.interval.ms", "600000");
+        props.put("metadata.retry.backoff.ms", "500");
+        props.put("metadata.fetch.timeout.ms", "1000");
+        props.put("maxThreads", "2");
+        props.put("enable.replace.partition.for.can.retry", "true");
+        props.put("enable.replace.partition.for.not.leader", "true");
+        props.put("enable.topic.partition.circuit.breaker", "true");
+        return props;
+    }
+
     /** stop and close kafka producer */
     @Override
     public void stop() {
@@ -159,12 +219,4 @@ public class KafkaProducerCluster implements 
LifecycleAware {
         }
     }
 
-    /**
-     * get cache cluster name
-     *
-     * @return cacheClusterName
-     */
-    public String getCacheClusterName() {
-        return cacheClusterName;
-    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
index 23e817dd8d..219519b907 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sort.standalone.sink.kafka;
 
 import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import com.google.common.base.Preconditions;
@@ -45,6 +47,7 @@ public class KafkaProducerFederation implements Runnable {
     private KafkaNodeConfig nodeConfig;
     private KafkaProducerCluster cluster;
     private KafkaProducerCluster deleteCluster;
+    private CacheClusterConfig cacheClusterConfig;
 
     public KafkaProducerFederation(String workerName, 
KafkaFederationSinkContext context) {
         this.workerName = Preconditions.checkNotNull(workerName);
@@ -86,13 +89,39 @@ public class KafkaProducerFederation implements Runnable {
             LOG.error("failed to close delete cluster, ex={}", e.getMessage(), 
e);
         }
 
+        if (CommonPropertiesHolder.useUnifiedConfiguration()) {
+            reloadByNodeConfig();
+        } else {
+            reloadByCacheClusterConfig();
+        }
+
+    }
+
+    private void reloadByCacheClusterConfig() {
         try {
+            if (cacheClusterConfig != null && 
!cacheClusterConfig.equals(context.getCacheClusterConfig())) {
+                return;
+            }
+            this.cacheClusterConfig = context.getCacheClusterConfig();
+            KafkaProducerCluster updateCluster =
+                    new KafkaProducerCluster(workerName, cacheClusterConfig, 
nodeConfig, context);
+            updateCluster.start();
+            this.deleteCluster = cluster;
+            this.cluster = updateCluster;
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
 
+    }
+
+    private void reloadByNodeConfig() {
+        try {
             if (nodeConfig != null && context.getNodeConfig().getVersion() <= 
nodeConfig.getVersion()) {
                 return;
             }
             this.nodeConfig = context.getNodeConfig();
-            KafkaProducerCluster updateCluster = new 
KafkaProducerCluster(workerName, nodeConfig, context);
+            KafkaProducerCluster updateCluster =
+                    new KafkaProducerCluster(workerName, cacheClusterConfig, 
nodeConfig, context);
             updateCluster.start();
             this.deleteCluster = cluster;
             this.cluster = updateCluster;
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index f5fe9c5b96..cce771675f 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -25,6 +25,7 @@ import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
@@ -50,6 +51,7 @@ public class PulsarFederationSinkContext extends SinkContext {
     public static final String KEY_EVENT_HANDLER = "eventHandler";
     private Map<String, PulsarIdConfig> idConfigMap = new 
ConcurrentHashMap<>();
     private PulsarNodeConfig pulsarNodeConfig;
+    private CacheClusterConfig cacheClusterConfig;
 
     public PulsarFederationSinkContext(String sinkName, Context context, 
Channel channel) {
         super(sinkName, context, channel);
@@ -73,6 +75,12 @@ public class PulsarFederationSinkContext extends SinkContext 
{
             if (pulsarNodeConfig == null || requestNodeConfig.getVersion() > 
pulsarNodeConfig.getVersion()) {
                 this.pulsarNodeConfig = requestNodeConfig;
             }
+
+            CacheClusterConfig clusterConfig = new CacheClusterConfig();
+            clusterConfig.setClusterName(this.taskName);
+            clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+            this.cacheClusterConfig = clusterConfig;
+
             this.taskConfig = newTaskConfig;
             this.sortTaskConfig = newSortTaskConfig;
 

Reply via email to