This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d062bc93 [INLONG-6738][DataProxy] New sink architecture integration 
(#6739)
5d062bc93 is described below

commit 5d062bc93ba62a0d4e09268d5b328fa6663dff55
Author: woofyzhao <490467...@qq.com>
AuthorDate: Wed Dec 7 17:27:19 2022 +0800

    [INLONG-6738][DataProxy] New sink architecture integration (#6739)
---
 .../inlong/common/heartbeat/ComponentHeartbeat.java     | 12 ++++++++----
 .../apache/inlong/common/heartbeat/HeartbeatMsg.java    |  7 ++++++-
 inlong-dataproxy/conf/common.properties                 |  5 +++++
 inlong-dataproxy/conf/dataproxy-pulsar.conf             | 15 ++++++++++-----
 .../inlong/dataproxy/config/RemoteConfigManager.java    | 11 ++++++-----
 .../apache/inlong/dataproxy/consts/ConfigConstants.java |  1 +
 .../inlong/dataproxy/heartbeat/HeartbeatManager.java    |  1 +
 .../dataproxy/sink/mq/MessageQueueZoneProducer.java     |  4 ++++
 .../inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java  | 17 +++++++++++++----
 .../inlong/dataproxy/source/ServerMessageHandler.java   |  6 ++++--
 .../apache/inlong/dataproxy/utils/ConfStringUtils.java  |  3 +++
 .../manager/service/heartbeat/HeartbeatManager.java     |  2 ++
 .../service/repository/DataProxyConfigRepository.java   |  3 +++
 13 files changed, 66 insertions(+), 21 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
index 567992918..96c173672 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java
@@ -31,6 +31,8 @@ public class ComponentHeartbeat {
 
     private String clusterTag;
 
+    private String extTag;
+
     private String clusterName;
 
     private String componentType;
@@ -49,11 +51,12 @@ public class ComponentHeartbeat {
     public ComponentHeartbeat() {
     }
 
-    public ComponentHeartbeat(String clusterTag, String clusterName,
-            String componentType, String ip, String port,
-            String inCharges, String protocolType) {
+    public ComponentHeartbeat(String clusterTag, String extTag,
+            String clusterName, String componentType, String ip,
+            String port, String inCharges, String protocolType) {
         this.nodeSrvStatus = NodeSrvStatus.OK;
         this.clusterTag = clusterTag;
+        this.extTag = extTag;
         this.clusterName = clusterName;
         this.componentType = componentType;
         this.ip = ip;
@@ -64,11 +67,12 @@ public class ComponentHeartbeat {
     }
 
     public ComponentHeartbeat(NodeSrvStatus nodeSrvStatus,
-            String clusterTag, String clusterName,
+            String clusterTag, String extTag, String clusterName,
             String componentType, String ip, String port,
             String inCharges, String protocolType, int loadValue) {
         this.nodeSrvStatus = nodeSrvStatus;
         this.clusterTag = clusterTag;
+        this.extTag = extTag;
         this.clusterName = clusterName;
         this.componentType = componentType;
         this.ip = ip;
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
index c386c9b60..cbfddf470 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java
@@ -75,6 +75,11 @@ public class HeartbeatMsg {
      */
     private String clusterTag;
 
+    /**
+     * Ext tag of cluster, key=value pairs seperated by &
+     */
+    private String extTag;
+
     /**
      * Name of responsible person, separated by commas(,)
      */
@@ -96,7 +101,7 @@ public class HeartbeatMsg {
     private Integer load = 0xffff;
 
     public ComponentHeartbeat componentHeartbeat() {
-        return new ComponentHeartbeat(nodeSrvStatus, clusterTag, clusterName,
+        return new ComponentHeartbeat(nodeSrvStatus, clusterTag, extTag, 
clusterName,
                 componentType, ip, port, inCharges, protocolType, load);
     }
 }
diff --git a/inlong-dataproxy/conf/common.properties 
b/inlong-dataproxy/conf/common.properties
index 557a10cb2..a13d63ac6 100644
--- a/inlong-dataproxy/conf/common.properties
+++ b/inlong-dataproxy/conf/common.properties
@@ -24,6 +24,7 @@ manager.auth.secretKey=
 # proxy cluster name
 proxy.cluster.name=default_dataproxy
 proxy.cluster.tag=default_cluster
+proxy.cluster.extTag=default=true
 proxy.cluster.inCharges=admin
 # check interval of local config (millisecond)
 configCheckInterval=10000
@@ -38,3 +39,7 @@ prometheusHttpPort=9081
 audit.enable=true
 # audit proxy address
 audit.proxys=127.0.0.1:10081
+
+# remote config loader
+idTopicConfig.type=org.apache.inlong.dataproxy.config.loader.ManagerIdTopicConfigLoader
+cacheClusterConfig.type=org.apache.inlong.dataproxy.config.loader.ManagerCacheClusterConfigLoader
diff --git a/inlong-dataproxy/conf/dataproxy-pulsar.conf 
b/inlong-dataproxy/conf/dataproxy-pulsar.conf
index 7cd457392..628d71ec4 100644
--- a/inlong-dataproxy/conf/dataproxy-pulsar.conf
+++ b/inlong-dataproxy/conf/dataproxy-pulsar.conf
@@ -116,17 +116,22 @@ agent1.channels.ch-msg6.fsyncPerTransaction = false
 agent1.channels.ch-msg6.fsyncInterval = 10
 
 agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
-agent1.sinks.pulsar-sink-msg1.type = 
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg1.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.pulsar-sink-msg1.maxThreads=1
 
 agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
-agent1.sinks.pulsar-sink-msg2.type = 
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg2.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.pulsar-sink-msg1.maxThreads=1
 
 # For order message
 agent1.sinks.pulsar-sink-msg3.channel = ch-msg3
-agent1.sinks.pulsar-sink-msg3.type = 
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg3.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.pulsar-sink-msg1.maxThreads=1
 
 agent1.sinks.pulsar-sink-msg5.channel = ch-msg5
-agent1.sinks.pulsar-sink-msg5.type = 
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg5.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.pulsar-sink-msg1.maxThreads=1
 
 agent1.sinks.pulsar-sink-msg6.channel = ch-msg6
-agent1.sinks.pulsar-sink-msg6.type = 
org.apache.inlong.dataproxy.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg6.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+agent1.sinks.pulsar-sink-msg1.maxThreads=1
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
index f2f33ce85..dc2548edb 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java
@@ -108,18 +108,19 @@ public class RemoteConfigManager implements IRepository {
                 try {
                     String strReloadInterval = 
CommonPropertiesHolder.getString(KEY_CONFIG_CHECK_INTERVAL);
                     instance.reloadInterval = 
NumberUtils.toLong(strReloadInterval, DEFAULT_HEARTBEAT_INTERVAL_MS);
-                    //
-                    String ipListParserType = 
CommonPropertiesHolder.getString(IManagerIpListParser.KEY_MANAGER_TYPE);
+
+                    String ipListParserType = 
CommonPropertiesHolder.getString(IManagerIpListParser.KEY_MANAGER_TYPE,
+                            DefaultManagerIpListParser.class.getName());
                     Class<? extends IManagerIpListParser> ipListParserClass;
                     ipListParserClass = (Class<? extends 
IManagerIpListParser>) Class
                             .forName(ipListParserType);
                     instance.ipListParser = 
ipListParserClass.getDeclaredConstructor().newInstance();
-                    //
+
                     SecureRandom random = new 
SecureRandom(String.valueOf(System.currentTimeMillis()).getBytes());
                     instance.managerIpListIndex.set(random.nextInt());
-                    //
+
                     instance.httpClient = constructHttpClient();
-                    //
+
                     instance.reload();
                     instance.setReloadTimer();
                     isInit = true;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index 55f4e74ed..32dbfbb46 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -98,6 +98,7 @@ public class ConfigConstants {
     public static final String PROXY_CLUSTER_NAME = "proxy.cluster.name";
     public static final String DEFAULT_PROXY_CLUSTER_NAME = "DataProxy";
     public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag";
+    public static final String PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag";
     public static final String PROXY_CLUSTER_INCHARGES = 
"proxy.cluster.inCharges";
     public static final String CONFIG_CHECK_INTERVAL = "configCheckInterval";
     public static final String SOURCE_NO_TOPIC_ACCEPT = 
"source.topic.notfound.accept";
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
index 208f46bab..5ae3001e3 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java
@@ -144,6 +144,7 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
                 ConfigConstants.PROXY_CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
         heartbeatMsg.setInCharges(commonProperties.getOrDefault(
                 ConfigConstants.PROXY_CLUSTER_INCHARGES, 
DEFAULT_CLUSTER_INCHARGES));
+        
heartbeatMsg.setExtTag(commonProperties.get(ConfigConstants.PROXY_CLUSTER_EXT_TAG));
 
         Map<String, String> groupIdMappings = 
configManager.getGroupIdMappingProperties();
         Map<String, Map<String, String>> streamIdMappings = 
configManager.getStreamIdMappingProperties();
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
index bc85b6097..5c221f892 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.sink.mq;
 
+import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -140,6 +141,9 @@ public class MessageQueueZoneProducer {
                 }
             }
             this.clusterList = newClusterList;
+            if (!ConfigManager.getInstance().isMqClusterReady()) {
+                ConfigManager.getInstance().updMqClusterStatus(true);
+            }
         } catch (Throwable e) {
             LOG.error(e.getMessage(), e);
         }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 23333a0ac..49eccce4f 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.dataproxy.sink.mq.pulsar;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
 import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
 import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
@@ -27,6 +28,7 @@ import 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
 import org.apache.inlong.dataproxy.sink.mq.OrderBatchPackProfileV0;
 import org.apache.inlong.dataproxy.sink.mq.SimpleBatchPackProfileV0;
 import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -113,9 +115,12 @@ public class PulsarHandler implements MessageQueueHandler {
             String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
             String authentication = config.getParams().get(KEY_AUTHENTICATION);
             Context context = sinkContext.getProducerContext();
-            this.client = PulsarClient.builder()
+            ClientBuilder builder = PulsarClient.builder();
+            if (StringUtils.isNotEmpty(authentication)) {
+                
builder.authentication(AuthenticationFactory.token(authentication));
+            }
+            this.client = builder
                     .serviceUrl(serviceUrl)
-                    
.authentication(AuthenticationFactory.token(authentication))
                     .ioThreads(context.getInteger(KEY_IOTHREADS, 1))
                     .memoryLimit(context.getLong(KEY_MEMORYLIMIT, 
1073741824L), SizeUnit.BYTES)
                     
.connectionsPerBroker(context.getInteger(KEY_CONNECTIONSPERBROKER, 10))
@@ -188,7 +193,7 @@ public class PulsarHandler implements MessageQueueHandler {
                 return false;
             }
             // topic
-            String producerTopic = this.getProducerTopic(baseTopic);
+            String producerTopic = this.getProducerTopic(baseTopic, idConfig);
             if (producerTopic == null) {
                 sinkContext.addSendResultMetric(event, event.getUid(), false, 
0);
                 sinkContext.getDispatchQueue().release(event.getSize());
@@ -243,11 +248,15 @@ public class PulsarHandler implements MessageQueueHandler 
{
     /**
      * getProducerTopic
      */
-    private String getProducerTopic(String baseTopic) {
+    private String getProducerTopic(String baseTopic, IdTopicConfig config) {
         StringBuilder builder = new StringBuilder();
         if (tenant != null) {
             builder.append(tenant).append("/");
         }
+        String namespace = this.namespace;
+        if (namespace == null) {
+            namespace = config.getParams().get(PulsarHandler.KEY_NAMESPACE);
+        }
         if (namespace != null) {
             builder.append(namespace).append("/");
         }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 442e70699..31c0b4ee8 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -40,7 +40,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
-import org.apache.flume.event.EventBuilder;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.common.msg.AttributeConstants;
@@ -57,6 +56,7 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.utils.DateTimeUtils;
 import org.apache.inlong.dataproxy.utils.InLongMsgVer;
 import org.apache.inlong.dataproxy.utils.MessageUtils;
+import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -556,7 +556,9 @@ public class ServerMessageHandler extends 
ChannelInboundHandlerAdapter {
                     strBuff.delete(0, strBuff.length());
                 }
                 final byte[] data = inLongMsg.buildArray();
-                Event event = EventBuilder.withBody(data, headers);
+                Event event = new ProxyEvent(groupId, streamIdEntry.getKey(), 
data,
+                        Long.parseLong(strDataTime), strRemoteIP);
+                event.getHeaders().putAll(headers);
                 inLongMsg.reset();
                 Pair<Boolean, String> evenProcType =
                         MessageUtils.getEventProcType(syncSend, proxySend);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/ConfStringUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/ConfStringUtils.java
index 9f5cb34bc..22daf77d0 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/ConfStringUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/ConfStringUtils.java
@@ -26,6 +26,9 @@ public class ConfStringUtils {
         if (ip == null || ip.trim().isEmpty()) {
             return false;
         }
+        if (ip.equals("localhost")) {
+            ip = "127.0.0.1";
+        }
         boolean b = false;
         ip = ip.trim();
         if (ip.matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index 8401c76a3..cc536f038 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -220,6 +220,7 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
         final String clusterName = componentHeartbeat.getClusterName();
         final String type = componentHeartbeat.getComponentType();
         final String clusterTag = componentHeartbeat.getClusterTag();
+        final String extTag = componentHeartbeat.getExtTag();
         Preconditions.checkNotNull(clusterTag, "cluster tag cannot be null");
         Preconditions.checkNotNull(type, "cluster type cannot be null");
         Preconditions.checkNotNull(clusterName, "cluster name cannot be null");
@@ -234,6 +235,7 @@ public class HeartbeatManager implements 
AbstractHeartbeatManager {
         cluster.setName(clusterName);
         cluster.setType(type);
         cluster.setClusterTags(clusterTag);
+        cluster.setExtTag(extTag);
         String inCharges = componentHeartbeat.getInCharges();
         if (StringUtils.isBlank(inCharges)) {
             inCharges = InlongConstants.ADMIN_USER;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
index adaa7c1f3..2a62f81f7 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java
@@ -262,6 +262,9 @@ public class DataProxyConfigRepository implements 
IRepository {
             // cache
             String clusterTag = proxyObj.getSetName();
             String extTag = proxyObj.getZone();
+            if (StringUtils.isEmpty(extTag)) {
+                continue;
+            }
             Map<String, List<CacheCluster>> cacheClusterZoneMap = 
cacheClusterMap.get(clusterTag);
             if (cacheClusterZoneMap != null) {
                 Map<String, String> subTagMap = 
tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag));

Reply via email to