This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d303033ad0 [INLONG-10597][Sort] Provide default pulsar producer configuration (#10600) d303033ad0 is described below commit d303033ad00f4b480c42be8bfb9bc96128bcfbac Author: vernedeng <verned...@apache.org> AuthorDate: Wed Jul 10 19:22:07 2024 +0800 [INLONG-10597][Sort] Provide default pulsar producer configuration (#10600) * [INLONG-10597][Sort] Provide default pulsar producer configuration --------- Co-authored-by: vernedeng <verned...@apache.rog> --- .../sink/pulsar/PulsarFederationSinkContext.java | 10 ++- .../sink/pulsar/PulsarProducerCluster.java | 80 ++++++++++++++-------- .../sink/pulsar/PulsarProducerFederation.java | 31 ++++++++- 3 files changed, 89 insertions(+), 32 deletions(-) diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java index cce771675f..a686d50214 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java @@ -76,14 +76,14 @@ public class PulsarFederationSinkContext extends SinkContext { this.pulsarNodeConfig = requestNodeConfig; } + this.taskConfig = newTaskConfig; + this.sortTaskConfig = newSortTaskConfig; + CacheClusterConfig clusterConfig = new CacheClusterConfig(); clusterConfig.setClusterName(this.taskName); clusterConfig.setParams(this.sortTaskConfig.getSinkParams()); this.cacheClusterConfig = clusterConfig; - this.taskConfig = newTaskConfig; - this.sortTaskConfig = newSortTaskConfig; - Map<String, PulsarIdConfig> fromTaskConfig = fromTaskConfig(taskConfig); Map<String, PulsarIdConfig> fromSortTaskConfig = fromSortTaskConfig(sortTaskConfig); SortConfigMetricReporter.reportClusterDiff(clusterId, taskName, fromTaskConfig, fromSortTaskConfig); @@ -139,6 +139,10 @@ public class PulsarFederationSinkContext extends SinkContext { return pulsarNodeConfig; } + public CacheClusterConfig getCacheClusterConfig() { + return cacheClusterConfig; + } + public void addSendMetric(ProfileEvent currentRecord, String topic) { Map<String, String> dimensions = new HashMap<>(); dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId()); diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java index 1ae34191ec..4f2f80e0ab 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java @@ -19,10 +19,11 @@ package org.apache.inlong.sort.standalone.sink.pulsar; import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig; import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig; import org.apache.inlong.sort.standalone.utils.Constants; import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; -import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.flume.Context; import org.apache.flume.Transaction; @@ -41,6 +42,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.slf4j.Logger; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; @@ -75,26 +77,31 @@ public class PulsarProducerCluster implements LifecycleAware { private final String workerName; private final PulsarFederationSinkContext sinkContext; private final PulsarNodeConfig nodeConfig; - private final Context context; - private final String cacheClusterName; + private final CacheClusterConfig cacheClusterConfig; + private String cacheClusterName; + private Context context; private LifecycleState state; private IEvent2PulsarRecordHandler handler; /** * pulsar client */ + private ClientBuilder clientBuilder; private PulsarClient client; private ProducerBuilder<byte[]> baseBuilder; private Map<String, Producer<byte[]>> producerMap = new ConcurrentHashMap<>(); - public PulsarProducerCluster(String workerName, PulsarNodeConfig nodeConfig, PulsarFederationSinkContext context) { + public PulsarProducerCluster( + String workerName, + CacheClusterConfig cacheClusterConfig, + PulsarNodeConfig nodeConfig, + PulsarFederationSinkContext context) { this.workerName = workerName; this.sinkContext = context; this.nodeConfig = nodeConfig; - this.context = new Context(nodeConfig.getProperties() != null ? nodeConfig.getProperties() : Maps.newHashMap()); + this.cacheClusterConfig = cacheClusterConfig; this.state = LifecycleState.IDLE; - this.cacheClusterName = nodeConfig.getNodeName(); this.handler = sinkContext.createEventHandler(); } @@ -106,16 +113,10 @@ public class PulsarProducerCluster implements LifecycleAware { this.state = LifecycleState.START; try { // create pulsar client - ClientBuilder clientBuilder = PulsarClient.builder(); - String serviceUrl = nodeConfig.getServiceUrl(); - if (StringUtils.isBlank(serviceUrl)) { - throw new IllegalArgumentException("service url should not be null"); - } - - clientBuilder.serviceUrl(serviceUrl); - String authentication = nodeConfig.getToken(); - if (StringUtils.isNoneBlank(authentication)) { - clientBuilder.authentication(AuthenticationFactory.token(authentication)); + if (CommonPropertiesHolder.useUnifiedConfiguration()) { + initBuilderByNodeConfig(nodeConfig); + } else { + initBuilderByCacheCluster(cacheClusterConfig); } this.client = clientBuilder @@ -135,7 +136,7 @@ public class PulsarProducerCluster implements LifecycleAware { .maxPendingMessagesAcrossPartitions( context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 50000)) .sendTimeout(context.getInteger(KEY_SENDTIMEOUT, 0), TimeUnit.MILLISECONDS) - .compressionType(this.getPulsarCompressionType()) + .compressionType(this.getPulsarCompressionType(context.getString(KEY_COMPRESSIONTYPE, "ZLIB"))) .blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, true)) .roundRobinRouterBatchingPartitionSwitchFrequency( context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 10)) @@ -145,13 +146,45 @@ public class PulsarProducerCluster implements LifecycleAware { } } + private void initBuilderByCacheCluster(CacheClusterConfig cacheClusterConfig) { + this.cacheClusterName = cacheClusterConfig.getClusterName(); + this.context = new Context(cacheClusterConfig.getParams()); + clientBuilder = PulsarClient.builder(); + String serviceUrl = cacheClusterConfig.getParams().get(KEY_SERVICE_URL); + if (StringUtils.isBlank(serviceUrl)) { + throw new IllegalArgumentException("service url should not be null"); + } + + clientBuilder.serviceUrl(serviceUrl); + String authentication = cacheClusterConfig.getParams().get(KEY_AUTHENTICATION); + if (StringUtils.isNoneBlank(authentication)) { + clientBuilder.authentication(AuthenticationFactory.token(authentication)); + } + } + + private void initBuilderByNodeConfig(PulsarNodeConfig nodeConfig) { + this.cacheClusterName = nodeConfig.getNodeName(); + this.context = new Context(nodeConfig.getProperties() == null ? new HashMap<>() : nodeConfig.getProperties()); + + clientBuilder = PulsarClient.builder(); + String serviceUrl = nodeConfig.getServiceUrl(); + if (StringUtils.isBlank(serviceUrl)) { + throw new IllegalArgumentException("service url should not be null"); + } + + clientBuilder.serviceUrl(serviceUrl); + String authentication = nodeConfig.getToken(); + if (StringUtils.isNoneBlank(authentication)) { + clientBuilder.authentication(AuthenticationFactory.token(authentication)); + } + } + /** * getPulsarCompressionType * * @return CompressionType */ - private CompressionType getPulsarCompressionType() { - String type = nodeConfig.getCompressionType(); + private CompressionType getPulsarCompressionType(String type) { if (type == null) { return CompressionType.ZLIB; } @@ -275,13 +308,4 @@ public class PulsarProducerCluster implements LifecycleAware { return true; } - /** - * get cacheClusterName - * - * @return the cacheClusterName - */ - public String getCacheClusterName() { - return cacheClusterName; - } - } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java index fd4a0b95b5..d12fddb123 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java @@ -19,6 +19,8 @@ package org.apache.inlong.sort.standalone.sink.pulsar; import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig; import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; +import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig; import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; import org.apache.flume.Transaction; @@ -41,6 +43,7 @@ public class PulsarProducerFederation { private final PulsarFederationSinkContext context; private Timer reloadTimer; private PulsarNodeConfig nodeConfig; + private CacheClusterConfig cacheClusterConfig; private PulsarProducerCluster cluster; private PulsarProducerCluster deleteCluster; @@ -108,12 +111,38 @@ public class PulsarProducerFederation { LOG.error("failed to close delete cluster, ex={}", e.getMessage(), e); } + if (CommonPropertiesHolder.useUnifiedConfiguration()) { + reloadByNodeConfig(); + } else { + reloadByCacheClusterConfig(); + } + + } + + private void reloadByNodeConfig() { try { if (nodeConfig != null && context.getNodeConfig().getVersion() <= nodeConfig.getVersion()) { return; } this.nodeConfig = context.getNodeConfig(); - PulsarProducerCluster updateCluster = new PulsarProducerCluster(workerName, nodeConfig, context); + PulsarProducerCluster updateCluster = + new PulsarProducerCluster(workerName, cacheClusterConfig, nodeConfig, context); + updateCluster.start(); + this.deleteCluster = cluster; + this.cluster = updateCluster; + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + } + } + + private void reloadByCacheClusterConfig() { + try { + if (cacheClusterConfig != null && !cacheClusterConfig.equals(context.getCacheClusterConfig())) { + return; + } + this.cacheClusterConfig = context.getCacheClusterConfig(); + PulsarProducerCluster updateCluster = + new PulsarProducerCluster(workerName, cacheClusterConfig, nodeConfig, context); updateCluster.start(); this.deleteCluster = cluster; this.cluster = updateCluster;