This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d9dc65e83 [INLONG-6985][DataProxy] Make maxMonitorCnt setting configurable (#6990) d9dc65e83 is described below commit d9dc65e836104f47e009f9eb11d380f1d1169e1f Author: Goson Zhang <4675...@qq.com> AuthorDate: Wed Dec 21 09:50:12 2022 +0800 [INLONG-6985][DataProxy] Make maxMonitorCnt setting configurable (#6990) --- .../conf/dataproxy-mulit-pulsar-http-example.conf | 7 ++++++- .../conf/dataproxy-mulit-pulsar-udp-example.conf | 5 +++++ inlong-dataproxy/conf/dataproxy-tubemq.conf | 12 ++++++++++++ inlong-dataproxy/conf/dataproxy.conf | 7 +++++++ .../apache/inlong/dataproxy/consts/ConfigConstants.java | 1 + .../org/apache/inlong/dataproxy/http/HttpBaseSource.java | 13 +++++++++---- .../java/org/apache/inlong/dataproxy/sink/PulsarSink.java | 14 ++++++++++---- .../java/org/apache/inlong/dataproxy/sink/TubeSink.java | 14 ++++++++++---- .../org/apache/inlong/dataproxy/source/BaseSource.java | 11 ++++++++++- 9 files changed, 70 insertions(+), 14 deletions(-) diff --git a/inlong-dataproxy/conf/dataproxy-mulit-pulsar-http-example.conf b/inlong-dataproxy/conf/dataproxy-mulit-pulsar-http-example.conf index b744ecefb..56d2d7a85 100644 --- a/inlong-dataproxy/conf/dataproxy-mulit-pulsar-http-example.conf +++ b/inlong-dataproxy/conf/dataproxy-mulit-pulsar-http-example.conf @@ -46,6 +46,7 @@ agent1.sources.http-source.metric-recovery-path = ./data/file/recovery agent1.sources.http-source.metric-agent-port=8003 agent1.sources.http-source.metric-cache-size=1000000 agent1.sources.http-source.set=10 +agent1.sources.http-source.max-monitor-cnt=500000 agent1.channels.ch-msg1.type = memory agent1.channels.ch-msg1.capacity = 10000 @@ -77,12 +78,16 @@ agent1.channels.ch-msg6.fsyncInterval = 10 agent1.sinks.pulsar-sink-msg1.channel = ch-msg1 agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg1.max-monitor-cnt=500000 agent1.sinks.pulsar-sink-msg2.channel = ch-msg2 agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg2.max-monitor-cnt=500000 agent1.sinks.pulsar-sink-msg5.channel = ch-msg5 agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg5.max-monitor-cnt=500000 agent1.sinks.pulsar-sink-msg6.channel = ch-msg6 -agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink \ No newline at end of file +agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg6.max-monitor-cnt=500000 \ No newline at end of file diff --git a/inlong-dataproxy/conf/dataproxy-mulit-pulsar-udp-example.conf b/inlong-dataproxy/conf/dataproxy-mulit-pulsar-udp-example.conf index 018b02bc1..827db35a1 100644 --- a/inlong-dataproxy/conf/dataproxy-mulit-pulsar-udp-example.conf +++ b/inlong-dataproxy/conf/dataproxy-mulit-pulsar-udp-example.conf @@ -46,6 +46,7 @@ agent1.sources.upd-source.metric-recovery-path = ./data/file/recovery agent1.sources.upd-source.metric-agent-port = 8003 agent1.sources.upd-source.metric-cache-size = 1000000 agent1.sources.upd-source.set = 10 +agent1.sources.upd-source.max-monitor-cnt=500000 agent1.channels.ch-msg1.type = memory agent1.channels.ch-msg1.capacity = 10000 @@ -77,12 +78,16 @@ agent1.channels.ch-msg6.fsyncInterval = 10 agent1.sinks.pulsar-sink-msg1.channel = ch-msg1 agent1.sinks.pulsar-sink-msg1.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg1.max-monitor-cnt=500000 agent1.sinks.pulsar-sink-msg2.channel = ch-msg2 agent1.sinks.pulsar-sink-msg2.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg2.max-monitor-cnt=500000 agent1.sinks.pulsar-sink-msg5.channel = ch-msg5 agent1.sinks.pulsar-sink-msg5.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg5.max-monitor-cnt=500000 agent1.sinks.pulsar-sink-msg6.channel = ch-msg6 agent1.sinks.pulsar-sink-msg6.type = org.apache.inlong.dataproxy.sink.PulsarSink +agent1.sinks.pulsar-sink-msg6.max-monitor-cnt=500000 diff --git a/inlong-dataproxy/conf/dataproxy-tubemq.conf b/inlong-dataproxy/conf/dataproxy-tubemq.conf index 67f2e7fcf..9e4180165 100644 --- a/inlong-dataproxy/conf/dataproxy-tubemq.conf +++ b/inlong-dataproxy/conf/dataproxy-tubemq.conf @@ -51,6 +51,7 @@ agent1.sources.tcp-source.set=10 agent1.sources.tcp-source.old-metric-on=true agent1.sources.tcp-source.new-metric-on=true agent1.sources.tcp-source.metric_topic_prefix=manager_tmertic +agent1.sources.tcp-source.max-monitor-cnt=500000 # http-source agent1.sources.http-source.channels = ch-msg1 ch-msg2 ch-msg3 ch-msg5 ch-msg6 ch-msg7 ch-msg8 ch-msg9 ch-msg10 ch-back @@ -76,6 +77,7 @@ agent1.sources.http-source.set=10 agent1.sources.http-source.old-metric-on=true agent1.sources.http-source.new-metric-on=true agent1.sources.http-source.metric_topic_prefix=manager_tmertic +agent1.sources.http-source.max-monitor-cnt=500000 agent1.channels.ch-back.type = memory agent1.channels.ch-back.capacity = 10000000 @@ -154,39 +156,49 @@ agent1.channels.ch-msg10.fsyncInterval = 5 agent1.sinks.meta-sink-msg1.channel = ch-msg1 agent1.sinks.meta-sink-msg1.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.meta-sink-msg1.maxThreads = 1 +agent1.sinks.meta-sink-msg1.max-monitor-cnt=500000 agent1.sinks.meta-sink-msg2.channel = ch-msg2 agent1.sinks.meta-sink-msg2.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.meta-sink-msg2.maxThreads = 1 +agent1.sinks.meta-sink-msg2.max-monitor-cnt=500000 agent1.sinks.meta-sink-msg3.channel = ch-msg3 agent1.sinks.meta-sink-msg3.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.meta-sink-msg3.maxThreads = 1 +agent1.sinks.meta-sink-msg3.max-monitor-cnt=500000 agent1.sinks.meta-sink-msg5.channel = ch-msg5 agent1.sinks.meta-sink-msg5.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.meta-sink-msg5.maxThreads = 1 +agent1.sinks.meta-sink-msg5.max-monitor-cnt=500000 agent1.sinks.meta-sink-msg6.channel = ch-msg6 agent1.sinks.meta-sink-msg6.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.meta-sink-msg6.maxThreads = 1 +agent1.sinks.meta-sink-msg6.max-monitor-cnt=500000 agent1.sinks.meta-sink-msg7.channel = ch-msg7 agent1.sinks.meta-sink-msg7.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.meta-sink-msg7.maxThreads = 1 +agent1.sinks.meta-sink-msg7.max-monitor-cnt=500000 agent1.sinks.meta-sink-msg8.channel = ch-msg8 agent1.sinks.meta-sink-msg8.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.meta-sink-msg8.maxThreads = 1 +agent1.sinks.meta-sink-msg8.max-monitor-cnt=500000 agent1.sinks.meta-sink-msg9.channel = ch-msg9 agent1.sinks.meta-sink-msg9.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.meta-sink-msg9.maxThreads = 1 +agent1.sinks.meta-sink-msg9.max-monitor-cnt=500000 agent1.sinks.meta-sink-msg10.channel = ch-msg10 agent1.sinks.meta-sink-msg10.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.meta-sink-msg10.maxThreads = 1 +agent1.sinks.meta-sink-msg10.max-monitor-cnt=500000 agent1.sinks.meta-sink-back.channel = ch-back agent1.sinks.meta-sink-back.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.meta-sink-back.maxThreads = 1 +agent1.sinks.meta-sink-back.max-monitor-cnt=500000 \ No newline at end of file diff --git a/inlong-dataproxy/conf/dataproxy.conf b/inlong-dataproxy/conf/dataproxy.conf index d815193e3..c44e4525d 100644 --- a/inlong-dataproxy/conf/dataproxy.conf +++ b/inlong-dataproxy/conf/dataproxy.conf @@ -58,6 +58,7 @@ agent1.sources.tcp-source.metric-recovery-path = ./data/file/recovery agent1.sources.tcp-source.metric-agent-port = 8003 agent1.sources.tcp-source.metric-cache-size = 1000000 agent1.sources.tcp-source.set = 10 +agent1.sources.tcp-source.max-monitor-cnt=500000 # http-source agent1.sources.http-source.channels = ch-msg1 ch-msg2 ch-msg5 ch-msg6 @@ -80,6 +81,7 @@ agent1.sources.http-source.metric-recovery-path = ./data/file/recovery agent1.sources.http-source.metric-agent-port=8003 agent1.sources.http-source.metric-cache-size=1000000 agent1.sources.http-source.set=10 +agent1.sources.http-source.max-monitor-cnt=500000 agent1.channels.ch-msg1.type = memory agent1.channels.ch-msg1.capacity = 50000 @@ -118,20 +120,25 @@ agent1.channels.ch-msg6.fsyncInterval = 10 agent1.sinks.mq-sink-msg1.channel = ch-msg1 agent1.sinks.mq-sink-msg1.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.mq-sink-msg1.maxThreads = 1 +agent1.sinks.mq-sink-msg1.max-monitor-cnt=500000 agent1.sinks.mq-sink-msg2.channel = ch-msg2 agent1.sinks.mq-sink-msg2.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.mq-sink-msg2.maxThreads = 1 +agent1.sinks.mq-sink-msg2.max-monitor-cnt=500000 # For order message agent1.sinks.mq-sink-msg3.channel = ch-msg3 agent1.sinks.mq-sink-msg3.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.mq-sink-msg3.maxThreads = 1 +agent1.sinks.mq-sink-msg3.max-monitor-cnt=500000 agent1.sinks.mq-sink-msg5.channel = ch-msg5 agent1.sinks.mq-sink-msg5.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.mq-sink-msg5.maxThreads = 1 +agent1.sinks.mq-sink-msg5.max-monitor-cnt=500000 agent1.sinks.mq-sink-msg6.channel = ch-msg6 agent1.sinks.mq-sink-msg6.type = org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink agent1.sinks.mq-sink-msg6.maxThreads = 1 +agent1.sinks.mq-sink-msg6.max-monitor-cnt=500000 diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java index fea9e0238..f736313e3 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java @@ -61,6 +61,7 @@ public class ConfigConstants { public static final String STAT_INTERVAL_SEC = "stat-interval-sec"; public static final String MAX_MONITOR_CNT = "max-monitor-cnt"; + public static final int DEF_MONITOR_STAT_CNT = 300000; public static final String HEART_INTERVAL_SEC = "heart-interval-sec"; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java index 973bc128c..7faa75d64 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java @@ -17,7 +17,6 @@ package org.apache.inlong.dataproxy.http; -import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT; import com.google.common.base.Preconditions; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; @@ -54,7 +53,7 @@ public class HttpBaseSource extends AbstractSource implements EventDrivenSource, protected MonitorIndex monitorIndex = null; protected MonitorIndexExt monitorIndexExt = null; private int statIntervalSec = 60; - private int maxMonitorCnt = 300000; + private int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT; // audit protected DataProxyMetricItemSet metricItemSet; @@ -137,8 +136,14 @@ public class HttpBaseSource extends AbstractSource implements EventDrivenSource, // get statistic interval statIntervalSec = context.getInteger(ConfigConstants.STAT_INTERVAL_SEC, 60); Preconditions.checkArgument((statIntervalSec >= 0), "statIntervalSec must be >= 0"); - // get max monitor record count - maxMonitorCnt = context.getInteger(MAX_MONITOR_CNT, 300000); + // get maxMonitorCnt's configure value + try { + maxMonitorCnt = context.getInteger( + ConfigConstants.MAX_MONITOR_CNT, ConfigConstants.DEF_MONITOR_STAT_CNT); + } catch (NumberFormatException e) { + logger.warn("Property {} must specify an integer value: {}", + ConfigConstants.MAX_MONITOR_CNT, context.getString(ConfigConstants.MAX_MONITOR_CNT)); + } Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be >= 0"); customProcessor = context.getBoolean(ConfigConstants.CUSTOM_CHANNEL_PROCESSOR, false); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java index 9891987a2..25cf65417 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java @@ -18,7 +18,6 @@ package org.apache.inlong.dataproxy.sink; import static org.apache.inlong.dataproxy.consts.AttrConstants.SEP_HASHTAG; -import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; @@ -149,7 +148,7 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag private RateLimiter diskRateLimiter; private long t1 = System.currentTimeMillis(); - private int maxMonitorCnt = 300000; + private int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT; /* * Control whether the SinkRunner thread can read data from the Channel */ @@ -193,8 +192,15 @@ public class PulsarSink extends AbstractSink implements Configurable, SendMessag @Override public void configure(Context context) { logger.info("PulsarSink started and context = {}", context.toString()); - maxMonitorCnt = context.getInteger(MAX_MONITOR_CNT, 300000); - + // get maxMonitorCnt's configure value + try { + maxMonitorCnt = context.getInteger( + ConfigConstants.MAX_MONITOR_CNT, ConfigConstants.DEF_MONITOR_STAT_CNT); + } catch (NumberFormatException e) { + logger.warn("Property {} must specify an integer value: {}", + ConfigConstants.MAX_MONITOR_CNT, context.getString(ConfigConstants.MAX_MONITOR_CNT)); + } + Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be >= 0"); configManager = ConfigManager.getInstance(); topicProperties = configManager.getTopicProperties(); pulsarCluster = configManager.getMqClusterUrl2Token(); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java index fdd15c4c0..88da9d079 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java @@ -18,7 +18,6 @@ package org.apache.inlong.dataproxy.sink; import static org.apache.inlong.dataproxy.consts.AttrConstants.SEP_HASHTAG; -import static org.apache.inlong.dataproxy.consts.ConfigConstants.MAX_MONITOR_CNT; import com.google.common.base.Preconditions; import java.util.HashSet; @@ -84,7 +83,7 @@ public class TubeSink extends AbstractSink implements Configurable { private String usedMasterAddr = null; private Set<String> masterHostAndPortLists; // statistic info log - private int maxMonitorCnt = 300000; + private int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT; private int statIntervalSec = 60; private MonitorIndex monitorIndex; private MonitorIndexExt monitorIndexExt; @@ -137,8 +136,15 @@ public class TubeSink extends AbstractSink implements Configurable { // start message deduplication handler MSG_DEDUP_HANDLER.start(tubeConfig.getClientIdCache(), tubeConfig.getMaxSurvivedTime(), tubeConfig.getMaxSurvivedSize()); - // get statistic configure items - maxMonitorCnt = context.getInteger(MAX_MONITOR_CNT, 300000); + // get maxMonitorCnt's configure value + try { + maxMonitorCnt = context.getInteger( + ConfigConstants.MAX_MONITOR_CNT, ConfigConstants.DEF_MONITOR_STAT_CNT); + } catch (NumberFormatException e) { + logger.warn("Property {} must specify an integer value: {}", + ConfigConstants.MAX_MONITOR_CNT, context.getString(ConfigConstants.MAX_MONITOR_CNT)); + } + Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be >= 0"); statIntervalSec = tubeConfig.getStatIntervalSec(); Preconditions.checkArgument(statIntervalSec >= 0, "statIntervalSec must be >= 0"); // initial TubeMQ configure diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java index af17bc6ff..64612e79e 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java @@ -118,7 +118,7 @@ public abstract class BaseSource private static String HOST_DEFAULT_VALUE = "0.0.0.0"; - private static int maxMonitorCnt = 300000; + private static int maxMonitorCnt = ConfigConstants.DEF_MONITOR_STAT_CNT; private static int DEFAULT_MAX_CONNECTIONS = 5000; @@ -285,6 +285,15 @@ public abstract class BaseSource logger.warn("Simple TCP Source max-threads property must specify an integer value. {}", context.getString(ConfigConstants.MAX_THREADS)); } + // get maxMonitorCnt's configure value + try { + maxMonitorCnt = context.getInteger( + ConfigConstants.MAX_MONITOR_CNT, ConfigConstants.DEF_MONITOR_STAT_CNT); + } catch (NumberFormatException e) { + logger.warn("Property {} must specify an integer value: {}", + ConfigConstants.MAX_MONITOR_CNT, context.getString(ConfigConstants.MAX_MONITOR_CNT)); + } + Preconditions.checkArgument(maxMonitorCnt >= 0, "maxMonitorCnt must be >= 0"); receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, RECEIVE_BUFFER_DEFAULT_SIZE); if (receiveBufferSize > RECEIVE_BUFFER_MAX_SIZE) {