This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5d062bc93 [INLONG-6738][DataProxy] New sink architecture integration (#6739) 5d062bc93 is described below commit 5d062bc93ba62a0d4e09268d5b328fa6663dff55 Author: woofyzhao <490467...@qq.com> AuthorDate: Wed Dec 7 17:27:19 2022 +0800 [INLONG-6738][DataProxy] New sink architecture integration (#6739) --- .../inlong/common/heartbeat/ComponentHeartbeat.java | 12 ++++++++---- .../apache/inlong/common/heartbeat/HeartbeatMsg.java | 7 ++++++- inlong-dataproxy/conf/common.properties | 5 +++++ inlong-dataproxy/conf/dataproxy-pulsar.conf | 15 ++++++++++----- .../inlong/dataproxy/config/RemoteConfigManager.java | 11 ++++++----- .../apache/inlong/dataproxy/consts/ConfigConstants.java | 1 + .../inlong/dataproxy/heartbeat/HeartbeatManager.java | 1 + .../dataproxy/sink/mq/MessageQueueZoneProducer.java | 4 ++++ .../inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java | 17 +++++++++++++---- .../inlong/dataproxy/source/ServerMessageHandler.java | 6 ++++-- .../apache/inlong/dataproxy/utils/ConfStringUtils.java | 3 +++ .../manager/service/heartbeat/HeartbeatManager.java | 2 ++ .../service/repository/DataProxyConfigRepository.java | 3 +++ 13 files changed, 66 insertions(+), 21 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java index 567992918..96c173672 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/ComponentHeartbeat.java @@ -31,6 +31,8 @@ public class ComponentHeartbeat { private String clusterTag; + private String extTag; + private String clusterName; private String componentType; @@ -49,11 +51,12 @@ public class ComponentHeartbeat { public ComponentHeartbeat() { } - public ComponentHeartbeat(String clusterTag, String clusterName, - String componentType, String ip, String port, - String inCharges, String protocolType) { + public ComponentHeartbeat(String clusterTag, String extTag, + String clusterName, String componentType, String ip, + String port, String inCharges, String protocolType) { this.nodeSrvStatus = NodeSrvStatus.OK; this.clusterTag = clusterTag; + this.extTag = extTag; this.clusterName = clusterName; this.componentType = componentType; this.ip = ip; @@ -64,11 +67,12 @@ public class ComponentHeartbeat { } public ComponentHeartbeat(NodeSrvStatus nodeSrvStatus, - String clusterTag, String clusterName, + String clusterTag, String extTag, String clusterName, String componentType, String ip, String port, String inCharges, String protocolType, int loadValue) { this.nodeSrvStatus = nodeSrvStatus; this.clusterTag = clusterTag; + this.extTag = extTag; this.clusterName = clusterName; this.componentType = componentType; this.ip = ip; diff --git a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java index c386c9b60..cbfddf470 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java @@ -75,6 +75,11 @@ public class HeartbeatMsg { */ private String clusterTag; + /** + * Ext tag of cluster, key=value pairs seperated by & + */ + private String extTag; + /** * Name of responsible person, separated by commas(,) */ @@ -96,7 +101,7 @@ public class HeartbeatMsg { private Integer load = 0xffff; public ComponentHeartbeat componentHeartbeat() { - return new ComponentHeartbeat(nodeSrvStatus, clusterTag, clusterName, + return new ComponentHeartbeat(nodeSrvStatus, clusterTag, extTag, clusterName, componentType, ip, port, inCharges, protocolType, load); } } diff --git a/inlong-dataproxy/conf/common.properties b/inlong-dataproxy/conf/common.properties index 557a10cb2..a13d63ac6 100644 --- a/inlong-dataproxy/conf/common.properties +++ b/inlong-dataproxy/conf/common.properties @@ -24,6 +24,7 @@ manager.auth.secretKey= # proxy cluster name proxy.cluster.name=default_dataproxy proxy.cluster.tag=default_cluster +proxy.cluster.extTag=default=true proxy.cluster.inCharges=admin # check interval of local config (millisecond) configCheckInterval=10000 @@ -38,3 +39,7 @@ prometheusHttpPort=9081 audit.enable=true # audit proxy address audit.proxys=127.0.0.1:10081 + +# remote config loader +idTopicConfig.type=org.apache.inlong.dataproxy.config.loader.ManagerIdTopicConfigLoader +cacheClusterConfig.type=org.apache.inlong.dataproxy.config.loader.ManagerCacheClusterConfigLoader diff --git a/inlong-dataproxy/conf/dataproxy-pulsar.conf b/inlong-dataproxy/conf/dataproxy-pulsar.conf index 7cd457392..628d71ec4 100644 --- a/inlong-dataproxy/conf/dataproxy-pulsar.conf +++ b/inlong-dataproxy/conf/dataproxy-pulsar.conf @@ -116,17 +116,22 @@ agent1.channels.ch-msg6.fsyncPerTransaction = false agent1.channels.ch-msg6.fsyncInterval = 10 agent1.sinks.pulsar-sink-msg1.channel = ch-msg1 -agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink +agent1.sinks.pulsar-sink-msg1.maxThreads=1 agent1.sinks.pulsar-sink-msg2.channel = ch-msg2 -agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink +agent1.sinks.pulsar-sink-msg1.maxThreads=1 # For order message agent1.sinks.pulsar-sink-msg3.channel = ch-msg3 -agent1.sinks.pulsar-sink-msg3.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg3.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink +agent1.sinks.pulsar-sink-msg1.maxThreads=1 agent1.sinks.pulsar-sink-msg5.channel = ch-msg5 -agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink +agent1.sinks.pulsar-sink-msg1.maxThreads=1 agent1.sinks.pulsar-sink-msg6.channel = ch-msg6 -agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink +agent1.sinks.pulsar-sink-msg1.maxThreads=1 diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java index f2f33ce85..dc2548edb 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/RemoteConfigManager.java @@ -108,18 +108,19 @@ public class RemoteConfigManager implements IRepository { try { String strReloadInterval = CommonPropertiesHolder.getString(KEY_CONFIG_CHECK_INTERVAL); instance.reloadInterval = NumberUtils.toLong(strReloadInterval, DEFAULT_HEARTBEAT_INTERVAL_MS); - // - String ipListParserType = CommonPropertiesHolder.getString(IManagerIpListParser.KEY_MANAGER_TYPE); + + String ipListParserType = CommonPropertiesHolder.getString(IManagerIpListParser.KEY_MANAGER_TYPE, + DefaultManagerIpListParser.class.getName()); Class<? extends IManagerIpListParser> ipListParserClass; ipListParserClass = (Class<? extends IManagerIpListParser>) Class .forName(ipListParserType); instance.ipListParser = ipListParserClass.getDeclaredConstructor().newInstance(); - // + SecureRandom random = new SecureRandom(String.valueOf(System.currentTimeMillis()).getBytes()); instance.managerIpListIndex.set(random.nextInt()); - // + instance.httpClient = constructHttpClient(); - // + instance.reload(); instance.setReloadTimer(); isInit = true; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java index 55f4e74ed..32dbfbb46 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java @@ -98,6 +98,7 @@ public class ConfigConstants { public static final String PROXY_CLUSTER_NAME = "proxy.cluster.name"; public static final String DEFAULT_PROXY_CLUSTER_NAME = "DataProxy"; public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag"; + public static final String PROXY_CLUSTER_EXT_TAG = "proxy.cluster.extTag"; public static final String PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges"; public static final String CONFIG_CHECK_INTERVAL = "configCheckInterval"; public static final String SOURCE_NO_TOPIC_ACCEPT = "source.topic.notfound.accept"; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java index 208f46bab..5ae3001e3 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/heartbeat/HeartbeatManager.java @@ -144,6 +144,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager { ConfigConstants.PROXY_CLUSTER_NAME, DEFAULT_CLUSTER_NAME)); heartbeatMsg.setInCharges(commonProperties.getOrDefault( ConfigConstants.PROXY_CLUSTER_INCHARGES, DEFAULT_CLUSTER_INCHARGES)); + heartbeatMsg.setExtTag(commonProperties.get(ConfigConstants.PROXY_CLUSTER_EXT_TAG)); Map<String, String> groupIdMappings = configManager.getGroupIdMappingProperties(); Map<String, Map<String, String>> streamIdMappings = configManager.getStreamIdMappingProperties(); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java index bc85b6097..5c221f892 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java @@ -17,6 +17,7 @@ package org.apache.inlong.dataproxy.sink.mq; +import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,6 +141,9 @@ public class MessageQueueZoneProducer { } } this.clusterList = newClusterList; + if (!ConfigManager.getInstance().isMqClusterReady()) { + ConfigManager.getInstance().updMqClusterStatus(true); + } } catch (Throwable e) { LOG.error(e.getMessage(), e); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java index 23333a0ac..49eccce4f 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java @@ -17,6 +17,7 @@ package org.apache.inlong.dataproxy.sink.mq.pulsar; +import org.apache.commons.lang3.StringUtils; import org.apache.flume.Context; import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig; import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig; @@ -27,6 +28,7 @@ import org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext; import org.apache.inlong.dataproxy.sink.mq.OrderBatchPackProfileV0; import org.apache.inlong.dataproxy.sink.mq.SimpleBatchPackProfileV0; import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -113,9 +115,12 @@ public class PulsarHandler implements MessageQueueHandler { String serviceUrl = config.getParams().get(KEY_SERVICE_URL); String authentication = config.getParams().get(KEY_AUTHENTICATION); Context context = sinkContext.getProducerContext(); - this.client = PulsarClient.builder() + ClientBuilder builder = PulsarClient.builder(); + if (StringUtils.isNotEmpty(authentication)) { + builder.authentication(AuthenticationFactory.token(authentication)); + } + this.client = builder .serviceUrl(serviceUrl) - .authentication(AuthenticationFactory.token(authentication)) .ioThreads(context.getInteger(KEY_IOTHREADS, 1)) .memoryLimit(context.getLong(KEY_MEMORYLIMIT, 1073741824L), SizeUnit.BYTES) .connectionsPerBroker(context.getInteger(KEY_CONNECTIONSPERBROKER, 10)) @@ -188,7 +193,7 @@ public class PulsarHandler implements MessageQueueHandler { return false; } // topic - String producerTopic = this.getProducerTopic(baseTopic); + String producerTopic = this.getProducerTopic(baseTopic, idConfig); if (producerTopic == null) { sinkContext.addSendResultMetric(event, event.getUid(), false, 0); sinkContext.getDispatchQueue().release(event.getSize()); @@ -243,11 +248,15 @@ public class PulsarHandler implements MessageQueueHandler { /** * getProducerTopic */ - private String getProducerTopic(String baseTopic) { + private String getProducerTopic(String baseTopic, IdTopicConfig config) { StringBuilder builder = new StringBuilder(); if (tenant != null) { builder.append(tenant).append("/"); } + String namespace = this.namespace; + if (namespace == null) { + namespace = config.getParams().get(PulsarHandler.KEY_NAMESPACE); + } if (namespace != null) { builder.append(namespace).append("/"); } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java index 442e70699..31c0b4ee8 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java @@ -40,7 +40,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.flume.ChannelException; import org.apache.flume.Event; import org.apache.flume.channel.ChannelProcessor; -import org.apache.flume.event.EventBuilder; import org.apache.inlong.common.monitor.MonitorIndex; import org.apache.inlong.common.monitor.MonitorIndexExt; import org.apache.inlong.common.msg.AttributeConstants; @@ -57,6 +56,7 @@ import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.utils.DateTimeUtils; import org.apache.inlong.dataproxy.utils.InLongMsgVer; import org.apache.inlong.dataproxy.utils.MessageUtils; +import org.apache.inlong.sdk.commons.protocol.ProxyEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -556,7 +556,9 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { strBuff.delete(0, strBuff.length()); } final byte[] data = inLongMsg.buildArray(); - Event event = EventBuilder.withBody(data, headers); + Event event = new ProxyEvent(groupId, streamIdEntry.getKey(), data, + Long.parseLong(strDataTime), strRemoteIP); + event.getHeaders().putAll(headers); inLongMsg.reset(); Pair<Boolean, String> evenProcType = MessageUtils.getEventProcType(syncSend, proxySend); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/ConfStringUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/ConfStringUtils.java index 9f5cb34bc..22daf77d0 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/ConfStringUtils.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/ConfStringUtils.java @@ -26,6 +26,9 @@ public class ConfStringUtils { if (ip == null || ip.trim().isEmpty()) { return false; } + if (ip.equals("localhost")) { + ip = "127.0.0.1"; + } boolean b = false; ip = ip.trim(); if (ip.matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java index 8401c76a3..cc536f038 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java @@ -220,6 +220,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager { final String clusterName = componentHeartbeat.getClusterName(); final String type = componentHeartbeat.getComponentType(); final String clusterTag = componentHeartbeat.getClusterTag(); + final String extTag = componentHeartbeat.getExtTag(); Preconditions.checkNotNull(clusterTag, "cluster tag cannot be null"); Preconditions.checkNotNull(type, "cluster type cannot be null"); Preconditions.checkNotNull(clusterName, "cluster name cannot be null"); @@ -234,6 +235,7 @@ public class HeartbeatManager implements AbstractHeartbeatManager { cluster.setName(clusterName); cluster.setType(type); cluster.setClusterTags(clusterTag); + cluster.setExtTag(extTag); String inCharges = componentHeartbeat.getInCharges(); if (StringUtils.isBlank(inCharges)) { inCharges = InlongConstants.ADMIN_USER; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java index adaa7c1f3..2a62f81f7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/repository/DataProxyConfigRepository.java @@ -262,6 +262,9 @@ public class DataProxyConfigRepository implements IRepository { // cache String clusterTag = proxyObj.getSetName(); String extTag = proxyObj.getZone(); + if (StringUtils.isEmpty(extTag)) { + continue; + } Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag); if (cacheClusterZoneMap != null) { Map<String, String> subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split(extTag));