This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 27217e34b [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink (#5545) 27217e34b is described below commit 27217e34ba57097019fe0ebdac013363e95e2cc2 Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Aug 15 18:56:48 2022 +0800 [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink (#5545) --- .../dataproxy/config/pojo/MQClusterConfig.java | 13 +- .../dataproxy/sink/SimpleMessageTubeSink.java | 34 +- .../org/apache/inlong/dataproxy/sink/TubeSink.java | 881 ++++++++------------- .../dataproxy/sink/common/MsgDedupHandler.java | 8 + .../dataproxy/sink/common/TubeProducerHolder.java | 277 +++++++ .../inlong/dataproxy/sink/common/TubeUtils.java | 80 ++ 6 files changed, 731 insertions(+), 562 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java index b49234b18..f1a87708e 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java @@ -86,7 +86,7 @@ public class MQClusterConfig extends Context { private static final int DEFAULT_MAX_TOPICS_EACH_PRODUCER_HOLD = 200; private static final String TUBE_REQUEST_TIMEOUT = "tube_request_timeout"; - private static final int DEFAULT_TUBE_REQUEST_TIMEOUT = 60; + private static final long DEFAULT_TUBE_REQUEST_TIMEOUT = 20000L; private static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = "link_max_allowed_delayed_msg_count"; private static final long DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 80000L; @@ -100,6 +100,9 @@ public class MQClusterConfig extends Context { private static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark"; private static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 1024 * 1024L; + private static final String HEARTBEAT_C2M_PERIOD_MS_MARK = "tube_heartbeat_period_ms"; + private static final long DEFAULT_HEARTBEAT_C2M_PERIOD_MS = 15000L; + private static final String RECOVER_THREAD_COUNT = "recover_thread_count"; private static final int DEFAULT_RECOVER_THREAD_COUNT = Runtime.getRuntime().availableProcessors() + 1; @@ -159,6 +162,10 @@ public class MQClusterConfig extends Context { return getLong(NETTY_WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK); } + public long getTubeHeartbeatPeriodMs() { + return getLong(HEARTBEAT_C2M_PERIOD_MS_MARK, DEFAULT_HEARTBEAT_C2M_PERIOD_MS); + } + public int getRecoverThreadCount() { return getInteger(RECOVER_THREAD_COUNT, DEFAULT_RECOVER_THREAD_COUNT); } @@ -284,8 +291,8 @@ public class MQClusterConfig extends Context { return getInteger(MAX_TOPICS_EACH_PRODUCER_HOLD, DEFAULT_MAX_TOPICS_EACH_PRODUCER_HOLD); } - public int getTubeRequestTimeout() { - return getInteger(TUBE_REQUEST_TIMEOUT, DEFAULT_TUBE_REQUEST_TIMEOUT); + public long getTubeRpcTimeoutMs() { + return getLong(TUBE_REQUEST_TIMEOUT, DEFAULT_TUBE_REQUEST_TIMEOUT); } public String getLogTopic() { diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java index 990b44cac..521f3c24f 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java @@ -43,12 +43,12 @@ import org.apache.flume.source.shaded.guava.RateLimiter; import org.apache.inlong.common.metric.MetricRegister; import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback; -import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler; +import org.apache.inlong.dataproxy.sink.common.TubeUtils; import org.apache.inlong.dataproxy.utils.Constants; import org.apache.inlong.dataproxy.utils.NetworkUtils; import org.apache.inlong.tubemq.client.config.TubeClientConfig; @@ -57,7 +57,6 @@ import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory; import org.apache.inlong.tubemq.client.producer.MessageProducer; import org.apache.inlong.tubemq.client.producer.MessageSentCallback; import org.apache.inlong.tubemq.client.producer.MessageSentResult; -import org.apache.inlong.tubemq.corebase.Message; import org.apache.inlong.tubemq.corebase.TErrCodeConstants; import org.apache.inlong.tubemq.corerpc.exception.OverflowException; import org.slf4j.Logger; @@ -346,6 +345,7 @@ public class SimpleMessageTubeSink extends AbstractSink implements Configurable } class SinkTask implements Runnable { + private void sendMessage(Event event, String topic, AtomicBoolean flag, EventStat es) throws TubeClientException, InterruptedException { if (msgDedupHandler.judgeDupAndPutMsgSeqId( @@ -353,38 +353,12 @@ public class SimpleMessageTubeSink extends AbstractSink implements Configurable logger.info("{} agent package {} existed,just discard.", getName(), event.getHeaders().get(ConfigConstants.SEQUENCE_ID)); } else { - Message message = this.parseEvent2Message(topic, event); - producer.sendMessage(message, new MyCallback(es)); + producer.sendMessage(TubeUtils.buildMessage( + topic, event, true), new MyCallback(es)); flag.set(true); } illegalTopicMap.remove(topic); } - - /** - * parseEvent2Message - * @param topic - * @param event - * @return - */ - private Message parseEvent2Message(String topic, Event event) { - Message message = new Message(topic, event.getBody()); - message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp()); - String streamId = ""; - if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) { - streamId = event.getHeaders().get(AttributeConstants.STREAM_ID); - } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) { - streamId = event.getHeaders().get(AttributeConstants.INAME); - } - message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY)); - // common attributes - Map<String, String> headers = event.getHeaders(); - message.setAttrKeyVal(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID)); - message.setAttrKeyVal(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID)); - message.setAttrKeyVal(Constants.TOPIC, headers.get(Constants.TOPIC)); - message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME)); - message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP)); - return message; - } private void handleException(Throwable t, String topic, boolean decrementFlag, EventStat es) { if (t instanceof TubeClientException) { diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java index 00abce075..e49903329 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java @@ -17,20 +17,19 @@ package org.apache.inlong.dataproxy.sink; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Preconditions; import org.apache.commons.collections.SetUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.Channel; import org.apache.flume.Context; @@ -42,382 +41,172 @@ import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.apache.flume.source.shaded.guava.RateLimiter; import org.apache.inlong.common.metric.MetricRegister; +import org.apache.inlong.common.monitor.LogCounter; +import org.apache.inlong.dataproxy.base.HighPriorityThreadFactory; import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback; import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig; -import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler; +import org.apache.inlong.dataproxy.sink.common.TubeProducerHolder; +import org.apache.inlong.dataproxy.sink.common.TubeUtils; import org.apache.inlong.dataproxy.utils.Constants; -import org.apache.inlong.dataproxy.utils.NetworkUtils; -import org.apache.inlong.tubemq.client.config.TubeClientConfig; +import org.apache.inlong.dataproxy.utils.FailoverChannelProcessorHolder; import org.apache.inlong.tubemq.client.exception.TubeClientException; -import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory; import org.apache.inlong.tubemq.client.producer.MessageProducer; import org.apache.inlong.tubemq.client.producer.MessageSentCallback; import org.apache.inlong.tubemq.client.producer.MessageSentResult; -import org.apache.inlong.tubemq.corebase.Message; import org.apache.inlong.tubemq.corebase.TErrCodeConstants; -import org.apache.inlong.tubemq.corerpc.exception.OverflowException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TubeSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(TubeSink.class); - private static final MsgDedupHandler msgDedupHandler = new MsgDedupHandler(); - private static final ConcurrentHashMap<String, Long> illegalTopicMap = new ConcurrentHashMap<>(); + private static final MsgDedupHandler MSG_DEDUP_HANDLER = new MsgDedupHandler(); + private TubeProducerHolder producerHolder = null; private static final String TOPIC = "topic"; - // key: masterUrl - public Map<String, TubeMultiSessionFactory> sessionFactories; - public Map<String, List<TopicProducerInfo>> masterUrl2producers; - // key: topic - public Map<String, List<TopicProducerInfo>> producerInfoMap; private volatile boolean canTake = false; private volatile boolean canSend = false; + private volatile boolean isOverFlow = false; private ConfigManager configManager; private Map<String, String> topicProperties; private MQClusterConfig tubeConfig; + private String usedMasterAddr = null; private Set<String> masterHostAndPortLists; - // used for RoundRobin different cluster while send message - private AtomicInteger clusterIndex = new AtomicInteger(0); - private LinkedBlockingQueue<EventStat> resendQueue; - private LinkedBlockingQueue<Event> eventQueue; private RateLimiter diskRateLimiter; private Thread[] sinkThreadPool; private Map<String, String> dimensions; private DataProxyMetricItemSet metricItemSet; - - private boolean overflow = false; - - /** - * diff publish - */ - public void diffSetPublish(Set<String> originalSet, Set<String> endSet) { - if (SetUtils.isEqualSet(originalSet, endSet)) { - return; - } - - boolean changed = false; - Set<String> newTopics = new HashSet<>(); - for (String s : endSet) { - if (!originalSet.contains(s)) { - changed = true; - newTopics.add(s); - } - } - - if (changed) { - try { - initTopicSet(newTopics); - } catch (Exception e) { - logger.info("meta sink publish new topic fail.", e); - } - - logger.info("topics.properties has changed, trigger diff publish for {}", getName()); - topicProperties = configManager.getTopicProperties(); - } + private final AtomicBoolean started = new AtomicBoolean(false); + private static final LogCounter LOG_SINK_TASK_PRINTER = + new LogCounter(10, 100000, 60 * 1000); + private LinkedBlockingQueue<Event> eventQueue; + private LinkedBlockingQueue<EventStat> resendQueue; + private final AtomicLong cachedMsgCnt = new AtomicLong(0); + private final AtomicLong takenMsgCnt = new AtomicLong(0); + private final AtomicLong resendMsgCnt = new AtomicLong(0); + private final AtomicLong blankTopicDiscardMsgCnt = new AtomicLong(0); + private final AtomicLong frozenTopicDiscardMsgCnt = new AtomicLong(0); + private final AtomicLong dupDiscardMsgCnt = new AtomicLong(0); + private final AtomicLong inflightMsgCnt = new AtomicLong(0); + private final AtomicLong successMsgCnt = new AtomicLong(0); + // statistic thread + private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors + .newScheduledThreadPool(1, new HighPriorityThreadFactory("tubeSink-Printer-thread")); + + { + SCHEDULED_EXECUTOR_SERVICE.scheduleWithFixedDelay(new TubeStatsTask(), 30L, + 60L, TimeUnit.SECONDS); + logger.info("success to start performance statistic task!"); } - /** - * when masterUrlLists change, update tubeClient - * - * @param originalCluster previous masterHostAndPortList set - * @param endCluster new masterHostAndPortList set - */ - public void diffUpdateTubeClient(Set<String> originalCluster, Set<String> endCluster) { - if (SetUtils.isEqualSet(originalCluster, endCluster)) { - return; - } - // close - for (String masterUrl : originalCluster) { - - if (!endCluster.contains(masterUrl)) { - // step1: close and remove all related producers - List<TopicProducerInfo> producerInfoList = masterUrl2producers.get(masterUrl); - if (producerInfoList != null) { - for (TopicProducerInfo producerInfo : producerInfoList) { - producerInfo.shutdown(); - // remove from topic<->producer map - for (String topic : producerInfo.getTopicSet()) { - List<TopicProducerInfo> curTopicProducers = producerInfoMap.get(topic); - if (curTopicProducers != null) { - curTopicProducers.remove(producerInfo); - } - } - } - // remove from masterUrl<->producer map - masterUrl2producers.remove(masterUrl); - } - - // step2: close and remove related sessionFactories - TubeMultiSessionFactory sessionFactory = sessionFactories.get(masterUrl); - if (sessionFactory != null) { - try { - sessionFactory.shutdown(); - } catch (TubeClientException e) { - logger.error("destroy sessionFactory error in tubesink, MetaClientException {}", - e.getMessage()); - } - sessionFactories.remove(masterUrl); - } - - logger.info("close tubeClient of masterList:{}", masterUrl); - } - - } - // start new client - for (String masterUrl : endCluster) { - if (!originalCluster.contains(masterUrl)) { - TubeMultiSessionFactory sessionFactory = createConnection(masterUrl); - if (sessionFactory != null) { - List<Set<String>> topicGroups = partitionTopicSet(new HashSet<>(topicProperties.values())); - for (Set<String> topicSet : topicGroups) { - createTopicProducers(masterUrl, sessionFactory, topicSet); - } - logger.info("successfully start new tubeClient for the new masterList: {}", masterUrl); - } - } - } - + @Override + public void configure(Context context) { + logger.info(getName() + " configure from context: {}", context); + // initial parameters + configManager = ConfigManager.getInstance(); + tubeConfig = configManager.getMqClusterConfig(); + topicProperties = configManager.getTopicProperties(); masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet(); - } - - /** - * when there are multi clusters, pick producer based on round-robin - */ - private MessageProducer getProducer(String topic) throws TubeClientException { - if (producerInfoMap.containsKey(topic) && !producerInfoMap.get(topic).isEmpty()) { - - List<TopicProducerInfo> producers = producerInfoMap.get(topic); - // round-roubin dispatch - int currentIndex = clusterIndex.getAndIncrement(); - if (currentIndex > Integer.MAX_VALUE / 2) { - clusterIndex.set(0); - } - int producerIndex = currentIndex % producers.size(); - return producers.get(producerIndex).getProducer(); - } - return null; -// else { -// synchronized (this) { -// if (!producerInfoMap.containsKey(topic)) { -// if (producer == null || currentPublishTopicNum.get() >= tubeConfig.getMaxTopicsEachProducerHold()) { -// producer = sessionFactory.createProducer(); -// currentPublishTopicNum.set(0); -// } -// // publish topic -// producer.publish(topic); -// producerMap.put(topic, producer); -// currentPublishTopicNum.incrementAndGet(); -// } -// } -// return producerMap.get(topic); -// } - } - - private TubeClientConfig initTubeConfig(String masterUrl) throws Exception { - final TubeClientConfig tubeClientConfig = new TubeClientConfig(masterUrl); - tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(tubeConfig.getLinkMaxAllowedDelayedMsgCount()); - tubeClientConfig.setSessionWarnDelayedMsgCount(tubeConfig.getSessionWarnDelayedMsgCount()); - tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(tubeConfig.getSessionMaxAllowedDelayedMsgCount()); - tubeClientConfig.setNettyWriteBufferHighWaterMark(tubeConfig.getNettyWriteBufferHighWaterMark()); - tubeClientConfig.setHeartbeatPeriodMs(15000L); - tubeClientConfig.setRpcTimeoutMs(20000L); - - return tubeClientConfig; - } - - /** - * If this function is called successively without calling {@see #destroyConnection()}, only the - * first call has any effect. - * - * @throws FlumeException if an RPC client connection could not be opened - */ - private void initCreateConnection() throws FlumeException { - // check the TubeMQ address - if (masterHostAndPortLists == null || masterHostAndPortLists.isEmpty()) { - logger.warn("Failed to get TubeMQ Cluster, make sure register TubeMQ to manager successfully."); - return; - } - // if already connected, just skip - if (sessionFactories != null) { - return; - } - sessionFactories = new HashMap<>(); - for (String masterUrl : masterHostAndPortLists) { - createConnection(masterUrl); - } - - if (sessionFactories.size() == 0) { - throw new FlumeException("create tube sessionFactories err, please re-check"); - } - } - - private TubeMultiSessionFactory createConnection(String masterHostAndPortList) { - TubeMultiSessionFactory sessionFactory; - try { - TubeClientConfig conf = initTubeConfig(masterHostAndPortList); - sessionFactory = new TubeMultiSessionFactory(conf); - sessionFactories.put(masterHostAndPortList, sessionFactory); - } catch (Throwable e) { - logger.error("connect to tube meta error, maybe tube master set error/shutdown, please re-check", e); - throw new FlumeException("connect to tube meta error, maybe tube master set error/shutdown in progress, " - + "please re-check"); - } - return sessionFactory; - } - - private void destroyConnection() { - for (List<TopicProducerInfo> producerInfoList : producerInfoMap.values()) { - for (TopicProducerInfo producerInfo : producerInfoList) { - producerInfo.shutdown(); - } - } - producerInfoMap.clear(); - - if (sessionFactories != null) { - for (TubeMultiSessionFactory sessionFactory : sessionFactories.values()) { - try { - sessionFactory.shutdown(); - } catch (Exception e) { - logger.error("destroy sessionFactory error in tubesink: ", e); - } - } - } - sessionFactories.clear(); - masterUrl2producers.clear(); - logger.debug("closed meta producer"); - } - - /** - * partition topicSet to different group, each group is associated with a producer; - * if there are multi clusters, then each group is associated with a set of producer - */ - private List<Set<String>> partitionTopicSet(Set<String> topicSet) { - List<Set<String>> topicGroups = new ArrayList<>(); - - List<String> sortedList = new ArrayList<>(topicSet); - Collections.sort(sortedList); - int maxTopicsEachProducerHolder = tubeConfig.getMaxTopicsEachProducerHold(); - int cycle = sortedList.size() / maxTopicsEachProducerHolder; - int remainder = sortedList.size() % maxTopicsEachProducerHolder; - - for (int i = 0; i <= cycle; i++) { - // allocate topic - Set<String> subset = new HashSet<>(); - int startIndex = i * maxTopicsEachProducerHolder; - int endIndex = startIndex + maxTopicsEachProducerHolder - 1; - if (i == cycle) { - if (remainder == 0) { - continue; - } else { - endIndex = startIndex + remainder - 1; - } - } - for (int index = startIndex; index <= endIndex; index++) { - subset.add(sortedList.get(index)); - } - - topicGroups.add(subset); + // start message deduplication handler + MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(), + tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize()); + // only use first cluster address now + usedMasterAddr = getFirstClusterAddr(masterHostAndPortLists); + // create producer holder + producerHolder = new TubeProducerHolder(getName(), + usedMasterAddr, configManager.getMqClusterConfig()); + // initial TubeMQ configure + // initial resend queue size + int badEventQueueSize = tubeConfig.getBadEventQueueSize(); + Preconditions.checkArgument(badEventQueueSize > 0, "badEventQueueSize must be > 0"); + resendQueue = new LinkedBlockingQueue<>(badEventQueueSize); + // initial sink thread pool + int threadNum = tubeConfig.getThreadNum(); + Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0"); + sinkThreadPool = new Thread[threadNum]; + // initial event queue size + int eventQueueSize = tubeConfig.getEventQueueSize(); + Preconditions.checkArgument(eventQueueSize > 0, "eventQueueSize must be > 0"); + eventQueue = new LinkedBlockingQueue<>(eventQueueSize); + // initial disk rate limiter + if (tubeConfig.getDiskIoRatePerSec() != 0) { + diskRateLimiter = RateLimiter.create(tubeConfig.getDiskIoRatePerSec()); } - return topicGroups; - } - - /** - * create producer and publish topic - */ - private void createTopicProducers(String masterUrl, TubeMultiSessionFactory sessionFactory, - Set<String> topicGroup) { - - TopicProducerInfo info = new TopicProducerInfo(sessionFactory); - info.initProducer(); - Set<String> succTopicSet = info.publishTopic(topicGroup); - - masterUrl2producers.computeIfAbsent(masterUrl, k -> new ArrayList<>()).add(info); - - if (succTopicSet != null) { - for (String succTopic : succTopicSet) { - producerInfoMap.computeIfAbsent(succTopic, k -> new ArrayList<>()).add(info); - + // register configure change callback functions + configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() { + @Override + public void update() { + diffSetPublish(new HashSet<>(topicProperties.values()), + new HashSet<>(configManager.getTopicProperties().values())); } - } - } - - private void initTopicSet(Set<String> topicSet) throws Exception { - long startTime = System.currentTimeMillis(); - - if (sessionFactories != null) { - List<Set<String>> topicGroups = partitionTopicSet(topicSet); - for (Set<String> subset : topicGroups) { - for (Map.Entry<String, TubeMultiSessionFactory> entry : sessionFactories.entrySet()) { - createTopicProducers(entry.getKey(), entry.getValue(), subset); - } + }); + configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() { + @Override + public void update() { + diffUpdateTubeClient(masterHostAndPortLists, + configManager.getMqClusterUrl2Token().keySet()); } - logger.info(getName() + " producer is ready for topics : " + producerInfoMap.keySet()); - logger.info(getName() + " initTopicSet cost: " + (System.currentTimeMillis() - startTime) + "ms"); - } + }); } @Override public void start() { + if (!this.started.compareAndSet(false, true)) { + logger.info("Duplicated call, " + getName() + " has started!"); + return; + } this.dimensions = new HashMap<>(); this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy"); this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName()); // register metrics this.metricItemSet = new DataProxyMetricItemSet(this.getName()); MetricRegister.register(metricItemSet); - // create tube connection try { - initCreateConnection(); + producerHolder.start(new HashSet<>(topicProperties.values())); } catch (FlumeException e) { - logger.error("Unable to create tube client" + ". Exception follows.", e); - // Try to prevent leaking resources - destroyConnection(); - // FIXME: Mark ourselves as failed - stop(); + logger.error("Unable to start TubeMQ client. Exception follows.", e); + super.stop(); return; } - // start the cleaner thread - super.start(); this.canSend = true; this.canTake = true; - - try { - initTopicSet(new HashSet<String>(topicProperties.values())); - } catch (Exception e) { - logger.info("meta sink start publish topic fail.", e); - } - for (int i = 0; i < sinkThreadPool.length; i++) { - sinkThreadPool[i] = new Thread(new SinkTask(), getName() + "_tube_sink_sender-" + i); + sinkThreadPool[i] = new Thread(new TubeSinkTask(), + getName() + "_tube_sink_sender-" + i); sinkThreadPool[i].start(); } - + logger.info(getName() + " started!"); } - /** - * resend event - */ - private void resendEvent(EventStat es, boolean isDecrement) { - try { - if (es == null || es.getEvent() == null) { - return; + @Override + public void stop() { + if (!this.started.compareAndSet(true, false)) { + logger.info("Duplicated call, " + getName() + " has stopped!"); + return; + } + this.canTake = false; + if (sinkThreadPool != null) { + for (Thread thread : sinkThreadPool) { + if (thread == null) { + continue; + } + thread.interrupt(); } - msgDedupHandler.invalidMsgSeqId(es.getEvent() - .getHeaders().get(ConfigConstants.SEQUENCE_ID)); - } catch (Throwable throwable) { - logger.error(getName() + " Discard msg because put events to both of queue and " - + "fileChannel fail,current resendQueue.size = " - + resendQueue.size(), throwable); } + if (producerHolder != null) { + producerHolder.stop(); + } + super.stop(); + logger.info(getName() + " stopped!"); } @Override @@ -442,19 +231,20 @@ public class TubeSink extends AbstractSink implements Configurable { } else { dimensions = getNewDimension(DataProxyMetricItem.KEY_SINK_DATA_ID, ""); } - if (!eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) { - logger.info("[{}] Channel --> Queue(has no enough space,current code point) " - + "--> Tube,Check if Tube server or network is ok.(if this situation last long time " - + "it will cause memoryChannel full and fileChannel write.)", getName()); - tx.rollback(); - // metric + if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) { + tx.commit(); + cachedMsgCnt.incrementAndGet(); DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); - metricItem.readFailCount.incrementAndGet(); + metricItem.readSuccessCount.incrementAndGet(); metricItem.readFailSize.addAndGet(event.getBody().length); } else { - tx.commit(); + tx.rollback(); + //logger.info("[{}] Channel --> Queue(has no enough space,current code point) " + // + "--> TubeMQ, check if TubeMQ server or network is ok.(if this situation last long time " + // + "it will cause memoryChannel full and fileChannel write.)", getName()); + // metric DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions); - metricItem.readSuccessCount.incrementAndGet(); + metricItem.readFailCount.incrementAndGet(); metricItem.readFailSize.addAndGet(event.getBody().length); } } else { @@ -475,196 +265,116 @@ public class TubeSink extends AbstractSink implements Configurable { return status; } - @Override - public void configure(Context context) { - logger.info("configure from context: {}", context); - - configManager = ConfigManager.getInstance(); - topicProperties = configManager.getTopicProperties(); - masterHostAndPortLists = configManager.getMqClusterUrl2Token().keySet(); - tubeConfig = configManager.getMqClusterConfig(); - configManager.getTopicConfig().addUpdateCallback(new ConfigUpdateCallback() { - @Override - public void update() { - diffSetPublish(new HashSet<>(topicProperties.values()), - new HashSet<>(configManager.getTopicProperties().values())); - } - }); - configManager.getMqClusterHolder().addUpdateCallback(new ConfigUpdateCallback() { - @Override - public void update() { - diffUpdateTubeClient(masterHostAndPortLists, configManager.getMqClusterUrl2Token().keySet()); - } - }); - - producerInfoMap = new ConcurrentHashMap<>(); - masterUrl2producers = new ConcurrentHashMap<>(); - // start message deduplication handler - msgDedupHandler.start(tubeConfig.getClientIdCache(), - tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize()); - - int badEventQueueSize = tubeConfig.getBadEventQueueSize(); - Preconditions.checkArgument(badEventQueueSize > 0, "badEventQueueSize must be > 0"); - resendQueue = new LinkedBlockingQueue<>(badEventQueueSize); - - int threadNum = tubeConfig.getThreadNum(); - Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0"); - sinkThreadPool = new Thread[threadNum]; - int eventQueueSize = tubeConfig.getEventQueueSize(); - Preconditions.checkArgument(eventQueueSize > 0, "eventQueueSize must be > 0"); - eventQueue = new LinkedBlockingQueue<>(eventQueueSize); - - if (tubeConfig.getDiskIoRatePerSec() != 0) { - diskRateLimiter = RateLimiter.create(tubeConfig.getDiskIoRatePerSec()); - } - - } - - private Map<String, String> getNewDimension(String otherKey, String value) { - Map<String, String> dimensions = new HashMap<>(); - dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy"); - dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName()); - dimensions.put(otherKey, value); - return dimensions; - } - - /** - * get metricItemSet - * - * @return the metricItemSet - */ - public DataProxyMetricItemSet getMetricItemSet() { - return metricItemSet; - } - - class SinkTask implements Runnable { - - private void sendMessage(MessageProducer producer, Event event, - String topic, AtomicBoolean flag, EventStat es) - throws TubeClientException, InterruptedException { - if (msgDedupHandler.judgeDupAndPutMsgSeqId( - event.getHeaders().get(ConfigConstants.SEQUENCE_ID))) { - logger.info("{} agent package {} existed,just discard.", - getName(), event.getHeaders().get(ConfigConstants.SEQUENCE_ID)); - } else { - Message message = new Message(topic, event.getBody()); - message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp()); - String streamId = ""; - if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) { - streamId = event.getHeaders().get(AttributeConstants.STREAM_ID); - } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) { - streamId = event.getHeaders().get(AttributeConstants.INAME); - } - message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY)); - producer.sendMessage(message, new MyCallback(es)); - flag.set(true); - } - illegalTopicMap.remove(topic); - } - - private void handleException(Throwable t, String topic, boolean decrementFlag, EventStat es) { - if (t instanceof TubeClientException) { - String message = t.getMessage(); - if (message != null && (message.contains("No available queue for topic") - || message.contains("The brokers of topic are all forbidden"))) { - illegalTopicMap.put(topic, System.currentTimeMillis() + 60 * 1000); - logger.info("IllegalTopicMap.put " + topic); - return; - } else { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - //ignore.. - } - } - } - logger.error("Sink task fail to send the message, decrementFlag=" + decrementFlag + ",sink.name=" - + Thread.currentThread().getName() - + ",event.headers=" + es.getEvent().getHeaders(), t); + private class TubeSinkTask implements Runnable { + public TubeSinkTask() { + // ignore } @Override public void run() { + Event event = null; + EventStat es = null; + String topic = null; + boolean sendFinished = false; + MessageProducer producer = null; logger.info("sink task {} started.", Thread.currentThread().getName()); while (canSend) { - boolean decrementFlag = false; - boolean resendBadEvent = false; - Event event = null; - EventStat es = null; - String topic = null; try { - if (TubeSink.this.overflow) { - TubeSink.this.overflow = false; - Thread.sleep(10); + if (isOverFlow) { + isOverFlow = false; + Thread.sleep(30); } + event = null; + topic = null; + // get event from queues if (!resendQueue.isEmpty()) { es = resendQueue.poll(); - if (es != null) { - event = es.getEvent(); - // logger.warn("Resend event: {}", event.toString()); - if (event.getHeaders().containsKey(TOPIC)) { - topic = event.getHeaders().get(TOPIC); - } - resendBadEvent = true; + if (es == null) { + continue; + } + resendMsgCnt.decrementAndGet(); + event = es.getEvent(); + if (event.getHeaders().containsKey(TOPIC)) { + topic = event.getHeaders().get(TOPIC); } } else { - event = eventQueue.take(); + event = eventQueue.poll(2000, TimeUnit.MILLISECONDS); + if (event == null) { + if (!canTake && takenMsgCnt.get() <= 0) { + logger.info("Found canTake is false and taken message count is zero, braek!"); + break; + } + continue; + } + cachedMsgCnt.decrementAndGet(); + takenMsgCnt.incrementAndGet(); es = new EventStat(event); -// sendCnt.incrementAndGet(); if (event.getHeaders().containsKey(TOPIC)) { topic = event.getHeaders().get(TOPIC); } } - - if (event == null) { - // ignore event is null, when multiple-thread SinkTask running - // this null value comes from resendQueue - continue; - } - - if (topic == null || topic.equals("")) { - logger.warn("no topic specified in event header, just skip this event"); - continue; - } - - Long expireTime = illegalTopicMap.get(topic); - if (expireTime != null) { - long currentTime = System.currentTimeMillis(); - if (expireTime > currentTime) { - // TODO: need to be improved. -// reChannelEvent(es, topic); - continue; - } else { - illegalTopicMap.remove(topic); + // valid event status + if (StringUtils.isBlank(topic)) { + blankTopicDiscardMsgCnt.incrementAndGet(); + takenMsgCnt.decrementAndGet(); + if (LOG_SINK_TASK_PRINTER.shouldPrint()) { + logger.error("No topic specified, just discard the event, event header is " + + event.getHeaders().toString()); } - } - MessageProducer producer = null; - try { - producer = getProducer(topic); - } catch (Exception e) { - logger.error("Get producer failed!", e); - } - - if (producer == null) { - illegalTopicMap.put(topic, System.currentTimeMillis() + 30 * 1000); continue; } - - AtomicBoolean flagAtomic = new AtomicBoolean(decrementFlag); - sendMessage(producer, event, topic, flagAtomic, es); - decrementFlag = flagAtomic.get(); + // send message + sendFinished = sendMessage(es, event, topic); } catch (InterruptedException e) { logger.info("Thread {} has been interrupted!", Thread.currentThread().getName()); return; } catch (Throwable t) { - handleException(t, topic, decrementFlag, es); - resendEvent(es, decrementFlag); + resendEvent(es, sendFinished); + if (t instanceof TubeClientException) { + String message = t.getMessage(); + if (message != null && (message.contains("No available queue for topic") + || message.contains("The brokers of topic are all forbidden"))) { + isOverFlow = true; + } + } + if (LOG_SINK_TASK_PRINTER.shouldPrint()) { + logger.error("Sink task fail to send the message, finished =" + sendFinished + + ",sink.name=" + Thread.currentThread().getName() + + ",event.headers=" + es.getEvent().getHeaders(), t); + } } } + logger.info("sink task {} stopped!", Thread.currentThread().getName()); + } + + private boolean sendMessage(EventStat es, Event event, String topic) throws Exception { + MessageProducer producer = producerHolder.getProducer(topic); + if (producer == null) { + frozenTopicDiscardMsgCnt.incrementAndGet(); + takenMsgCnt.decrementAndGet(); + if (LOG_SINK_TASK_PRINTER.shouldPrint()) { + logger.error("Get producer failed for " + topic); + } + return false; + } + if (MSG_DEDUP_HANDLER.judgeDupAndPutMsgSeqId( + event.getHeaders().get(ConfigConstants.SEQUENCE_ID))) { + dupDiscardMsgCnt.incrementAndGet(); + takenMsgCnt.decrementAndGet(); + logger.info("{} agent package {} existed,just discard.", + Thread.currentThread().getName(), + event.getHeaders().get(ConfigConstants.SEQUENCE_ID)); + return false; + } else { + producer.sendMessage(TubeUtils.buildMessage( + topic, event, false), new MyCallback(es)); + inflightMsgCnt.incrementAndGet(); + return true; + } } } - public class MyCallback implements MessageSentCallback { + private class MyCallback implements MessageSentCallback { private EventStat myEventStat; private long sendTime; @@ -677,7 +387,9 @@ public class TubeSink extends AbstractSink implements Configurable { @Override public void onMessageSent(final MessageSentResult result) { if (result.isSuccess()) { - // TODO: add stats + successMsgCnt.incrementAndGet(); + inflightMsgCnt.decrementAndGet(); + takenMsgCnt.decrementAndGet(); this.addMetric(myEventStat.getEvent(), true, sendTime); } else { this.addMetric(myEventStat.getEvent(), false, 0); @@ -685,10 +397,9 @@ public class TubeSink extends AbstractSink implements Configurable { logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}", result.getErrMsg(), resendQueue.size(), myEventStat.getEvent().hashCode()); - return; - } - if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW) { + } else if (result.getErrCode() != TErrCodeConstants.SERVER_RECEIVE_OVERFLOW + && LOG_SINK_TASK_PRINTER.shouldPrint()) { logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}", result.getErrMsg(), resendQueue.size(), myEventStat.getEvent().hashCode()); @@ -731,65 +442,177 @@ public class TubeSink extends AbstractSink implements Configurable { @Override public void onException(final Throwable e) { - Throwable t = e; - while (t.getCause() != null) { - t = t.getCause(); - } - if (t instanceof OverflowException) { - TubeSink.this.overflow = true; - } resendEvent(myEventStat, true); } } - class TopicProducerInfo { - - private TubeMultiSessionFactory sessionFactory; - private MessageProducer producer; - private Set<String> topicSet; + private class TubeStatsTask implements Runnable { - public TopicProducerInfo(TubeMultiSessionFactory sessionFactory) { - this.sessionFactory = sessionFactory; + @Override + public void run() { + if (!canTake && takenMsgCnt.get() <= 0) { + return; + } + logger.info(getName() + "[TubeSink Stats] cachedMsgCnt=" + cachedMsgCnt.get() + + ", takenMsgCnt=" + takenMsgCnt.get() + + ", resendMsgCnt=" + resendMsgCnt.get() + + ", blankTopicDiscardMsgCnt=" + blankTopicDiscardMsgCnt.get() + + ", frozenTopicDiscardMsgCnt=" + frozenTopicDiscardMsgCnt.get() + + ", dupDiscardMsgCnt=" + dupDiscardMsgCnt.get() + + ", inflightMsgCnt=" + inflightMsgCnt.get() + + ", successMsgCnt=" + successMsgCnt.get()); } + } - public void shutdown() { - if (producer != null) { - try { - producer.shutdown(); - } catch (Throwable e) { - logger.error("destroy producer error in tube sink", e); + /** + * resend event + */ + private void resendEvent(EventStat es, boolean sendFinished) { + try { + if (sendFinished) { + inflightMsgCnt.decrementAndGet(); + } + if (es == null || es.getEvent() == null) { + takenMsgCnt.decrementAndGet(); + return; + } + MSG_DEDUP_HANDLER.invalidMsgSeqId(es.getEvent() + .getHeaders().get(ConfigConstants.SEQUENCE_ID)); + if (resendQueue.offer(es)) { + resendMsgCnt.incrementAndGet(); + } else { + FailoverChannelProcessorHolder.getChannelProcessor().processEvent(es.getEvent()); + takenMsgCnt.decrementAndGet(); + if (LOG_SINK_TASK_PRINTER.shouldPrint()) { + logger.error(Thread.currentThread().getName() + + " Channel --> Tube --> ResendQueue(full) -->" + + "FailOverChannelProcessor(current code point)," + + " Resend queue is full,Check if Tube server or network is ok."); } } + } catch (Throwable throwable) { + takenMsgCnt.decrementAndGet(); + if (LOG_SINK_TASK_PRINTER.shouldPrint()) { + logger.error(getName() + " Discard msg because put events to both of queue and " + + "fileChannel fail,current resendQueue.size = " + + resendQueue.size(), throwable); + } } + } - public void initProducer() { - if (sessionFactory == null) { - logger.error("sessionFactory is null, can't create producer"); - return; + private Map<String, String> getNewDimension(String otherKey, String value) { + Map<String, String> dimensions = new HashMap<>(); + dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy"); + dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName()); + dimensions.put(otherKey, value); + return dimensions; + } + + /** + * Differentiate unpublished topic sets and publish them + * attention: only append added topics + * + * @param curTopicSet the current used topic set + * @param newTopicSet the latest configured topic set + */ + private void diffSetPublish(Set<String> curTopicSet, Set<String> newTopicSet) { + if (!this.started.get()) { + logger.info(getName() + " not started, ignore this change!"); + } + if (SetUtils.isEqualSet(curTopicSet, newTopicSet)) { + return; + } + // filter unpublished topics + Set<String> addedTopics = new HashSet<>(); + for (String topic : newTopicSet) { + if (StringUtils.isBlank(topic)) { + continue; } - try { - this.producer = sessionFactory.createProducer(); - } catch (TubeClientException e) { - logger.error("create tube messageProducer error in tubesink, ex {}", e.getMessage()); + if (!curTopicSet.contains(topic)) { + addedTopics.add(topic); } } - - public Set<String> publishTopic(Set<String> topicSet) { + // publish them + if (!addedTopics.isEmpty()) { try { - this.topicSet = producer.publish(topicSet); - } catch (TubeClientException e) { - logger.info(getName() + " meta sink initTopicSet fail.", e); + producerHolder.createProducersByTopicSet(addedTopics); + } catch (Exception e) { + logger.info(getName() + "'s publish new topic set fail.", e); } - return this.topicSet; + logger.info(getName() + "'s topics set has changed, trigger diff publish for {}", + addedTopics); + topicProperties = configManager.getTopicProperties(); } + } - public MessageProducer getProducer() { - return producer; + /** + * When masterUrlLists change, update tubeClient + * Requirement: when switching the Master cluster, + * the DataProxy node must not do the data reporting service + * + * @param curClusterSet previous masterHostAndPortList set + * @param newClusterSet new masterHostAndPortList set + */ + private void diffUpdateTubeClient(Set<String> curClusterSet, + Set<String> newClusterSet) { + if (!this.started.get()) { + logger.info(getName() + " not started, ignore this change!"); + } + if (newClusterSet == null || newClusterSet.isEmpty() + || SetUtils.isEqualSet(curClusterSet, newClusterSet) + || newClusterSet.contains(usedMasterAddr)) { + return; + } + String newMasterAddr = getFirstClusterAddr(newClusterSet); + if (newMasterAddr == null) { + return; + } + TubeProducerHolder newProducerHolder = new TubeProducerHolder(getName(), + newMasterAddr, configManager.getMqClusterConfig()); + try { + newProducerHolder.start(new HashSet<>(configManager.getTopicProperties().values())); + } catch (Throwable e) { + logger.error(getName() + " create new producer holder for " + newMasterAddr + + " failure, throw exception is {}", e.getMessage()); + return; } + // replace current producer holder + final String tmpMasterAddr = usedMasterAddr; + TubeProducerHolder tmpProducerHolder = producerHolder; + producerHolder = newProducerHolder; + usedMasterAddr = newMasterAddr; + // close old producer holder + tmpProducerHolder.stop(); + logger.info(getName() + " switch cluster from " + + tmpMasterAddr + " to " + usedMasterAddr); + } - public Set<String> getTopicSet() { - return this.topicSet; + /** + * Get first cluster address + * + * @param clusterSet cluster set configure + * @return the selected cluster address + * null if set is empty or if items are all blank + */ + private String getFirstClusterAddr(Set<String> clusterSet) { + String tmpMasterAddr = null; + for (String masterAddr : clusterSet) { + if (StringUtils.isBlank(masterAddr)) { + continue; + } + tmpMasterAddr = masterAddr; + break; } + return tmpMasterAddr; + } + + /** + * get metricItemSet + * + * @return the metricItemSet + */ + private DataProxyMetricItemSet getMetricItemSet() { + return metricItemSet; } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/MsgDedupHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/MsgDedupHandler.java index 297fbc769..ba3dec596 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/MsgDedupHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/MsgDedupHandler.java @@ -19,6 +19,7 @@ package org.apache.inlong.dataproxy.sink.common; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; +import com.google.common.cache.CacheStats; import com.google.common.cache.LoadingCache; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -95,4 +96,11 @@ public class MsgDedupHandler { } return "Disable for message data deduplication function"; } + + public CacheStats getCacheData() { + if (enableDataDedup) { + return msgSeqIdCache.stats(); + } + return null; + } } diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java new file mode 100644 index 000000000..c1852fed3 --- /dev/null +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.sink.common; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.StringUtils; +import org.apache.flume.FlumeException; +import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig; +import org.apache.inlong.tubemq.client.config.TubeClientConfig; +import org.apache.inlong.tubemq.client.exception.TubeClientException; +import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory; +import org.apache.inlong.tubemq.client.producer.MessageProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TubeProducerHolder { + private static final Logger logger = + LoggerFactory.getLogger(TubeProducerHolder.class); + private static final long SEND_FAILURE_WAIT = 30000L; + private static final long PUBLISH_FAILURE_WAIT = 60000L; + private final AtomicBoolean started = new AtomicBoolean(false); + private final String sinkName; + private final String clusterAddr; + private final MQClusterConfig clusterConfig; + private TubeMultiSessionFactory sessionFactory = null; + private final Map<String, MessageProducer> producerMap = new ConcurrentHashMap<>(); + private MessageProducer lastProducer = null; + private final AtomicInteger lastPubTopicCnt = new AtomicInteger(0); + private static final ConcurrentHashMap<String, AtomicLong> FROZEN_TOPIC_MAP + = new ConcurrentHashMap<>(); + + public TubeProducerHolder(String sinkName, String clusterAddr, MQClusterConfig tubeConfig) { + Preconditions.checkState(StringUtils.isNotBlank(clusterAddr), + "No TubeMQ's cluster address list specified"); + this.sinkName = sinkName; + this.clusterAddr = clusterAddr; + this.clusterConfig = tubeConfig; + } + + public void start(Set<String> configTopicSet) { + if (!this.started.compareAndSet(false, true)) { + logger.info("ProducerHolder for " + sinkName + " has started!"); + return; + } + logger.info("ProducerHolder for " + sinkName + " begin to start!"); + // create session factory + try { + TubeClientConfig clientConfig = TubeUtils.buildClientConfig(clusterAddr, this.clusterConfig); + this.sessionFactory = new TubeMultiSessionFactory(clientConfig); + createProducersByTopicSet(configTopicSet); + } catch (Throwable e) { + stop(); + String errInfo = "Build session factory to " + clusterAddr + + " for " + sinkName + " failure, please re-check"; + logger.error(errInfo, e); + throw new FlumeException(errInfo); + } + logger.info("ProducerHolder for " + sinkName + " started!"); + } + + public void stop() { + if (this.started.get()) { + return; + } + // change start flag + if (!this.started.compareAndSet(true, false)) { + logger.info("ProducerHolder for " + sinkName + " has stopped!"); + return; + } + logger.info("ProducerHolder for " + sinkName + " begin to stop!"); + for (Map.Entry<String, MessageProducer> entry : producerMap.entrySet()) { + if (entry == null || entry.getValue() == null) { + continue; + } + try { + entry.getValue().shutdown(); + } catch (Throwable e) { + // ignore log + } + } + producerMap.clear(); + lastProducer = null; + lastPubTopicCnt.set(0); + FROZEN_TOPIC_MAP.clear(); + if (sessionFactory != null) { + try { + sessionFactory.shutdown(); + } catch (Throwable e) { + // ignore log + } + sessionFactory = null; + } + logger.info("ProducerHolder for " + sinkName + " finished stop!"); + } + + /** + * Get producer by topic name: + * i. if the topic is judged to be an illegal topic, return null; + * ii. if it is not an illegal topic or the status has expired, check: + * a. if the topic has been published before, return the corresponding producer directly; + * b. if the topic is not in the published list, perform the topic's publish action. + * If the topic is thrown exception during the publishing process, + * set the topic to an illegal topic + * + * @param topicName the topic name + * + * @return the producer + * if topic is illegal, return null + * @throws TubeClientException + */ + public MessageProducer getProducer(String topicName) throws TubeClientException { + AtomicLong fbdTime = FROZEN_TOPIC_MAP.get(topicName); + if (fbdTime != null && fbdTime.get() > System.currentTimeMillis()) { + return null; + } + MessageProducer tmpProducer = producerMap.get(topicName); + if (tmpProducer != null) { + if (fbdTime != null) { + FROZEN_TOPIC_MAP.remove(topicName); + } + return tmpProducer; + } + synchronized (lastPubTopicCnt) { + fbdTime = FROZEN_TOPIC_MAP.get(topicName); + if (fbdTime != null && fbdTime.get() > System.currentTimeMillis()) { + return null; + } + if (lastProducer == null + || lastPubTopicCnt.get() >= clusterConfig.getMaxTopicsEachProducerHold()) { + lastProducer = sessionFactory.createProducer(); + lastPubTopicCnt.set(0); + } + try { + lastProducer.publish(topicName); + } catch (Throwable e) { + fbdTime = FROZEN_TOPIC_MAP.get(topicName); + if (fbdTime == null) { + AtomicLong tmpFbdTime = new AtomicLong(); + fbdTime = FROZEN_TOPIC_MAP.putIfAbsent(topicName, tmpFbdTime); + if (fbdTime == null) { + fbdTime = tmpFbdTime; + } + } + fbdTime.set(System.currentTimeMillis() + PUBLISH_FAILURE_WAIT); + logger.warn("Throw exception while publish topic=" + + topicName + ", exception is " + e.getMessage()); + return null; + } + producerMap.put(topicName, lastProducer); + lastPubTopicCnt.incrementAndGet(); + return lastProducer; + } + } + + /** + * Whether frozen production according to the exceptions returned by message sending + * + * @param topicName the topic name sent message + * @param throwable the exception information thrown when sending a message + * + * @return whether illegal topic + */ + public boolean needFrozenSent(String topicName, Throwable throwable) { + if (throwable instanceof TubeClientException) { + String message = throwable.getMessage(); + if (message != null && (message.contains("No available partition for topic") + || message.contains("The brokers of topic are all forbidden"))) { + AtomicLong fbdTime = FROZEN_TOPIC_MAP.get(topicName); + if (fbdTime == null) { + AtomicLong tmpFbdTime = new AtomicLong(0); + fbdTime = FROZEN_TOPIC_MAP.putIfAbsent(topicName, tmpFbdTime); + if (fbdTime == null) { + fbdTime = tmpFbdTime; + } + } + fbdTime.set(System.currentTimeMillis() + SEND_FAILURE_WAIT); + return true; + } + } + return false; + } + + /** + * Create sink producers by configured topic set + * group topicSet to different group, each group is associated with a producer + * + * @param cfgTopicSet the configured topic set + */ + public void createProducersByTopicSet(Set<String> cfgTopicSet) throws Exception { + if (cfgTopicSet == null || cfgTopicSet.isEmpty()) { + return; + } + // filter published topics + List<String> filteredTopics = new ArrayList<>(cfgTopicSet.size()); + for (String topicName : cfgTopicSet) { + if (StringUtils.isBlank(topicName) + || producerMap.get(topicName) != null) { + continue; + } + filteredTopics.add(topicName); + } + if (filteredTopics.isEmpty()) { + return; + } + // alloc topic count + Collections.sort(filteredTopics); + long startTime = System.currentTimeMillis(); + int maxPublishTopicCnt = clusterConfig.getMaxTopicsEachProducerHold(); + int allocTotalCnt = filteredTopics.size(); + List<Integer> topicGroupCnt = new ArrayList<>(); + int paddingCnt = (lastPubTopicCnt.get() <= 0) + ? 0 : (maxPublishTopicCnt - lastPubTopicCnt.get()); + while (allocTotalCnt > 0) { + if (paddingCnt > 0) { + topicGroupCnt.add(Math.min(allocTotalCnt, paddingCnt)); + allocTotalCnt -= paddingCnt; + paddingCnt = 0; + } else { + topicGroupCnt.add(Math.min(allocTotalCnt, maxPublishTopicCnt)); + allocTotalCnt -= maxPublishTopicCnt; + } + } + // create producer + int startPos = 0; + int endPos = 0; + Set<String> subTopicSet = new HashSet<>(); + for (Integer dltCnt : topicGroupCnt) { + // allocate topic items + subTopicSet.clear(); + endPos = startPos + dltCnt; + for (int index = startPos; index < endPos; index++) { + subTopicSet.add(filteredTopics.get(index)); + } + startPos = endPos; + // create producer + if (lastProducer == null + || lastPubTopicCnt.get() == maxPublishTopicCnt) { + lastProducer = sessionFactory.createProducer(); + lastPubTopicCnt.set(0); + } + lastProducer.publish(subTopicSet); + lastPubTopicCnt.addAndGet(subTopicSet.size()); + for (String topicItem : subTopicSet) { + producerMap.put(topicItem, lastProducer); + } + } + logger.info(sinkName + " initializes producers for topics:" + + producerMap.keySet() + ", cost: " + (System.currentTimeMillis() - startTime) + + "ms"); + } + +} diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java new file mode 100644 index 000000000..6a67978f8 --- /dev/null +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.sink.common; + +import java.util.Map; +import org.apache.flume.Event; +import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig; +import org.apache.inlong.dataproxy.consts.AttributeConstants; +import org.apache.inlong.dataproxy.consts.ConfigConstants; +import org.apache.inlong.dataproxy.utils.Constants; +import org.apache.inlong.dataproxy.utils.NetworkUtils; +import org.apache.inlong.tubemq.client.config.TubeClientConfig; +import org.apache.inlong.tubemq.corebase.Message; + +public class TubeUtils { + + /** + * Build TubeMQ's client configure + * + * @param clusterAddr the TubeMQ cluster address + * @param tubeConfig the TubeMQ cluster configure + * @return the TubeClientConfig object + */ + public static TubeClientConfig buildClientConfig(String clusterAddr, MQClusterConfig tubeConfig) { + final TubeClientConfig tubeClientConfig = new TubeClientConfig(clusterAddr); + tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(tubeConfig.getLinkMaxAllowedDelayedMsgCount()); + tubeClientConfig.setSessionWarnDelayedMsgCount(tubeConfig.getSessionWarnDelayedMsgCount()); + tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(tubeConfig.getSessionMaxAllowedDelayedMsgCount()); + tubeClientConfig.setNettyWriteBufferHighWaterMark(tubeConfig.getNettyWriteBufferHighWaterMark()); + tubeClientConfig.setHeartbeatPeriodMs(tubeConfig.getTubeHeartbeatPeriodMs()); + tubeClientConfig.setRpcTimeoutMs(tubeConfig.getTubeRpcTimeoutMs()); + return tubeClientConfig; + } + + /** + * Build TubeMQ's message + * + * @param topicName the topic name of message + * @param event the DataProxy event + * @param addExtraAttrs whether to add extra attributes + * @return the message object + */ + public static Message buildMessage(String topicName, + Event event, boolean addExtraAttrs) { + Message message = new Message(topicName, event.getBody()); + message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp()); + String streamId = ""; + if (event.getHeaders().containsKey(AttributeConstants.STREAM_ID)) { + streamId = event.getHeaders().get(AttributeConstants.STREAM_ID); + } else if (event.getHeaders().containsKey(AttributeConstants.INAME)) { + streamId = event.getHeaders().get(AttributeConstants.INAME); + } + message.putSystemHeader(streamId, event.getHeaders().get(ConfigConstants.PKG_TIME_KEY)); + if (addExtraAttrs) { + // common attributes + Map<String, String> headers = event.getHeaders(); + message.setAttrKeyVal(Constants.INLONG_GROUP_ID, headers.get(Constants.INLONG_GROUP_ID)); + message.setAttrKeyVal(Constants.INLONG_STREAM_ID, headers.get(Constants.INLONG_STREAM_ID)); + message.setAttrKeyVal(Constants.TOPIC, headers.get(Constants.TOPIC)); + message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME, headers.get(Constants.HEADER_KEY_MSG_TIME)); + message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, headers.get(Constants.HEADER_KEY_SOURCE_IP)); + } + return message; + } +}