This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f43664dd13 Support server level consumption throttle (#12292)
f43664dd13 is described below
commit f43664dd1384086be6669aa76a3e08c2db8a11e1
Author: Xiang Fu <[email protected]>
AuthorDate: Tue Jan 23 22:34:36 2024 -0800
Support server level consumption throttle (#12292)
---
.../pinot/common/metrics/AbstractMetrics.java | 6 +
.../realtime/RealtimeConsumptionRateManager.java | 39 ++-
.../realtime/RealtimeSegmentDataManager.java | 10 +-
.../RealtimeConsumptionRateManagerTest.java | 49 +++-
...nsumptionRateLimiterClusterIntegrationTest.java | 298 +++++++++++++++++++++
.../server/starter/helix/BaseServerStarter.java | 5 +-
.../apache/pinot/spi/utils/CommonConstants.java | 5 +
7 files changed, 394 insertions(+), 18 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 6dccf789d1..456d6ecb14 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -337,6 +337,12 @@ public abstract class AbstractMetrics<QP extends
AbstractMetrics.QueryPhase, M e
return PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName,
meter.getUnit(), TimeUnit.SECONDS);
}
+ public PinotMeter getMeteredValue(final M meter) {
+ final PinotMetricName metricName =
+ PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix +
meter.getMeterName());
+ return PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName,
meter.getUnit(), TimeUnit.SECONDS);
+ }
+
private String getTableFullMeterName(final String tableName, final M meter) {
String meterName = meter.getMeterName();
return _metricPrefix + getTableName(tableName) + "." + meterName;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
index bc5d16e915..c866ce2654 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
@@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.pinot.core.data.manager.realtime;
import com.google.common.annotations.VisibleForTesting;
@@ -31,19 +30,26 @@ import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class is responsible for creating realtime consumption rate limiters.
The rate limit, specified in
- * StreamConfig of table config, is for the entire topic. The effective rate
limit for each partition is simply the
- * specified rate limit divided by the partition count.
+ * This class is responsible for creating realtime consumption rate limiters.
+ * It contains one rate limiter for the entire server and multiple table
partition level rate limiters.
+ * Server rate limiter is used to throttle the overall consumption rate of the
server and configured via
+ * cluster or server config.
+ * For table partition level rate limiter, the rate limit value specified in
StreamConfig of table config, is for the
+ * entire topic. The effective rate limit for each partition is simply the
specified rate limit divided by the
+ * partition count.
* This class leverages a cache for storing partition count for different
topics as retrieving partition count from
* stream is a bit expensive and also the same count will be used of all
partition consumers of the same topic.
*/
@@ -51,6 +57,10 @@ public class RealtimeConsumptionRateManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeConsumptionRateManager.class);
private static final int CACHE_ENTRY_EXPIRATION_TIME_IN_MINUTES = 10;
+ private static final String SERVER_CONSUMPTION_RATE_METRIC_KEY_NAME =
+ ServerMeter.REALTIME_ROWS_CONSUMED.getMeterName();
+ private ConsumptionRateLimiter _serverRateLimiter = NOOP_RATE_LIMITER;
+
// stream config object is required for fetching the partition count from
the stream
private final LoadingCache<StreamConfig, Integer>
_streamConfigToTopicPartitionCountMap;
private volatile boolean _isThrottlingAllowed = false;
@@ -73,9 +83,28 @@ public class RealtimeConsumptionRateManager {
_isThrottlingAllowed = true;
}
+ public ConsumptionRateLimiter createServerRateLimiter(PinotConfiguration
serverConfig, ServerMetrics serverMetrics) {
+ double serverRateLimit =
+
serverConfig.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+ CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT);
+ if (serverRateLimit <= 0) {
+ LOGGER.warn("Invalid server consumption rate limit: {}, throttling is
disabled", serverRateLimit);
+ _serverRateLimiter = NOOP_RATE_LIMITER;
+ } else {
+ LOGGER.info("A server consumption rate limiter is set up with rate
limit: {}", serverRateLimit);
+ MetricEmitter metricEmitter = new MetricEmitter(serverMetrics,
SERVER_CONSUMPTION_RATE_METRIC_KEY_NAME);
+ _serverRateLimiter = new RateLimiterImpl(serverRateLimit, metricEmitter);
+ }
+ return _serverRateLimiter;
+ }
+
+ public ConsumptionRateLimiter getServerRateLimiter() {
+ return _serverRateLimiter;
+ }
+
public ConsumptionRateLimiter createRateLimiter(StreamConfig streamConfig,
String tableName,
ServerMetrics serverMetrics, String metricKeyName) {
- if (!streamConfig.getTopicConsumptionRateLimit().isPresent()) {
+ if (streamConfig.getTopicConsumptionRateLimit().isEmpty()) {
return NOOP_RATE_LIMITER;
}
int partitionCount;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 282f3c72a1..d64e85fada 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -298,7 +298,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private final boolean _isOffHeap;
private final boolean _nullHandlingEnabled;
private final SegmentCommitterFactory _segmentCommitterFactory;
- private final ConsumptionRateLimiter _rateLimiter;
+ private final ConsumptionRateLimiter _partitionRateLimiter;
+ private final ConsumptionRateLimiter _serverRateLimiter;
private final StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime;
private final CompletionMode _segmentCompletionMode;
@@ -516,7 +517,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
*/
private boolean processStreamEvents(MessageBatch messagesAndOffsets, long
idlePipeSleepTimeMillis) {
int messageCount = messagesAndOffsets.getMessageCount();
- _rateLimiter.throttle(messageCount);
+ _partitionRateLimiter.throttle(messageCount);
+ _serverRateLimiter.throttle(messageCount);
PinotMeter realtimeRowsConsumedMeter = null;
PinotMeter realtimeRowsDroppedMeter = null;
@@ -605,6 +607,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
realtimeRowsConsumedMeter =
_serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.REALTIME_ROWS_CONSUMED, 1,
realtimeRowsConsumedMeter);
+
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L);
} catch (Exception e) {
_numRowsErrored++;
String errorMessage = String.format("Caught exception while
indexing the record: %s", transformedRow);
@@ -1395,8 +1398,9 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_memoryManager = new DirectMemoryManager(_segmentNameStr,
_serverMetrics);
}
- _rateLimiter = RealtimeConsumptionRateManager.getInstance()
+ _partitionRateLimiter = RealtimeConsumptionRateManager.getInstance()
.createRateLimiter(_streamConfig, _tableNameWithType, _serverMetrics,
_clientId);
+ _serverRateLimiter =
RealtimeConsumptionRateManager.getInstance().getServerRateLimiter();
List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
String sortedColumn;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
index 21c58f9afa..325066fa16 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java
@@ -27,14 +27,12 @@ import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.Test;
import static
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.*;
-import static
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter;
-import static
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.MetricEmitter;
-import static
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.NOOP_RATE_LIMITER;
-import static
org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.RateLimiterImpl;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -51,6 +49,10 @@ public class RealtimeConsumptionRateManagerTest {
private static final StreamConfig STREAM_CONFIG_A = mock(StreamConfig.class);
private static final StreamConfig STREAM_CONFIG_B = mock(StreamConfig.class);
private static final StreamConfig STREAM_CONFIG_C = mock(StreamConfig.class);
+ private static final PinotConfiguration SERVER_CONFIG_1 =
mock(PinotConfiguration.class);
+ private static final PinotConfiguration SERVER_CONFIG_2 =
mock(PinotConfiguration.class);
+ private static final PinotConfiguration SERVER_CONFIG_3 =
mock(PinotConfiguration.class);
+ private static final PinotConfiguration SERVER_CONFIG_4 =
mock(PinotConfiguration.class);
private static RealtimeConsumptionRateManager _consumptionRateManager;
static {
@@ -65,6 +67,15 @@ public class RealtimeConsumptionRateManagerTest {
when(STREAM_CONFIG_B.getTopicConsumptionRateLimit()).thenReturn(Optional.of(RATE_LIMIT_FOR_ENTIRE_TOPIC));
when(STREAM_CONFIG_C.getTopicConsumptionRateLimit()).thenReturn(Optional.empty());
_consumptionRateManager = new RealtimeConsumptionRateManager(cache);
+
+
when(SERVER_CONFIG_1.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
+
when(SERVER_CONFIG_2.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(2.5);
+
when(SERVER_CONFIG_3.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(0.0);
+
when(SERVER_CONFIG_4.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
+
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(-1.0);
}
@Test
@@ -83,7 +94,27 @@ public class RealtimeConsumptionRateManagerTest {
}
@Test
- public void testBuildCache() throws Exception {
+ public void testCreateServerRateLimiter() {
+ // Server config 1
+ ConsumptionRateLimiter rateLimiter =
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_1, null);
+ assertEquals(5.0, ((RateLimiterImpl) rateLimiter).getRate(), DELTA);
+
+ // Server config 2
+ rateLimiter =
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_2, null);
+ assertEquals(2.5, ((RateLimiterImpl) rateLimiter).getRate(), DELTA);
+
+ // Server config 3
+ rateLimiter =
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_3, null);
+ assertEquals(rateLimiter, NOOP_RATE_LIMITER);
+
+ // Server config 4
+ rateLimiter =
_consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_4, null);
+ assertEquals(rateLimiter, NOOP_RATE_LIMITER);
+ }
+
+ @Test
+ public void testBuildCache()
+ throws Exception {
PartitionCountFetcher partitionCountFetcher =
mock(PartitionCountFetcher.class);
LoadingCache<StreamConfig, Integer> cache =
buildCache(partitionCountFetcher, 500, TimeUnit.MILLISECONDS);
when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(10);
@@ -150,21 +181,21 @@ public class RealtimeConsumptionRateManagerTest {
now = Clock.fixed(Instant.parse("2022-08-10T12:01:05Z"),
ZoneOffset.UTC).instant();
int sumOfMsgsInPrevMinute = sum(numMsgs);
int expectedRatio = calcExpectedRatio(rateLimitInMinutes,
sumOfMsgsInPrevMinute);
- numMsgs = new int[] {35};
+ numMsgs = new int[]{35};
assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now),
expectedRatio);
// 3rd minute
now = Clock.fixed(Instant.parse("2022-08-10T12:02:25Z"),
ZoneOffset.UTC).instant();
sumOfMsgsInPrevMinute = sum(numMsgs);
expectedRatio = calcExpectedRatio(rateLimitInMinutes,
sumOfMsgsInPrevMinute);
- numMsgs = new int[] {0};
+ numMsgs = new int[]{0};
assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now),
expectedRatio);
// 4th minute
now = Clock.fixed(Instant.parse("2022-08-10T12:03:15Z"),
ZoneOffset.UTC).instant();
sumOfMsgsInPrevMinute = sum(numMsgs);
expectedRatio = calcExpectedRatio(rateLimitInMinutes,
sumOfMsgsInPrevMinute);
- numMsgs = new int[] {10, 20};
+ numMsgs = new int[]{10, 20};
assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now),
expectedRatio);
now = Clock.fixed(Instant.parse("2022-08-10T12:03:20Z"),
ZoneOffset.UTC).instant();
assertEquals(metricEmitter.emitMetric(numMsgs[1], rateLimit, now),
expectedRatio);
@@ -173,7 +204,7 @@ public class RealtimeConsumptionRateManagerTest {
now = Clock.fixed(Instant.parse("2022-08-10T12:04:30Z"),
ZoneOffset.UTC).instant();
sumOfMsgsInPrevMinute = sum(numMsgs);
expectedRatio = calcExpectedRatio(rateLimitInMinutes,
sumOfMsgsInPrevMinute);
- numMsgs = new int[] {5};
+ numMsgs = new int[]{5};
assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now),
expectedRatio);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeConsumptionRateLimiterClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeConsumptionRateLimiterClusterIntegrationTest.java
new file mode 100644
index 0000000000..ca1253a0f6
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeConsumptionRateLimiterClusterIntegrationTest.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class RealtimeConsumptionRateLimiterClusterIntegrationTest extends
BaseRealtimeClusterIntegrationTest {
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(RealtimeConsumptionRateLimiterClusterIntegrationTest.class);
+
+ private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
+ private static final long RANDOM_SEED = System.currentTimeMillis();
+ private static final Random RANDOM = new Random(RANDOM_SEED);
+ private static final double SERVER_RATE_LIMIT = 100;
+
+ private final boolean _isDirectAlloc = RANDOM.nextBoolean();
+ private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
+ private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
+ private List<File> _avroFiles;
+
+ @Override
+ protected String getLoadMode() {
+ return ReadMode.mmap.name();
+ }
+
+ @Override
+ public void startController()
+ throws Exception {
+ super.startController();
+
enableResourceConfigForLeadControllerResource(_enableLeadControllerResource);
+ }
+
+ @Override
+ protected void overrideServerConf(PinotConfiguration configuration) {
+
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION,
true);
+
configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION,
_isDirectAlloc);
+ if (_isConsumerDirConfigured) {
+ configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR,
CONSUMER_DIRECTORY);
+ }
+
configuration.setProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
SERVER_RATE_LIMIT);
+ }
+
+ @Override
+ protected IngestionConfig getIngestionConfig() {
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setStreamIngestionConfig(
+ new
StreamIngestionConfig(Collections.singletonList(getStreamConfigMap())));
+ return ingestionConfig;
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ // all the data that was ingested from Kafka also got uploaded via the
controller's upload endpoint
+ return super.getCountStarResult() * 2;
+ }
+
+ @BeforeClass
+ @Override
+ public void setUp()
+ throws Exception {
+ // Remove the consumer directory
+ FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY));
+
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+
+ // Start Kafka
+ startKafka();
+
+ // Unpack the Avro files
+ _avroFiles = unpackAvroData(_tempDir);
+
+ // Push data into Kafka
+ pushAvroIntoKafka(_avroFiles);
+ }
+
+ @AfterClass
+ @Override
+ public void tearDown()
+ throws Exception {
+ FileUtils.deleteDirectory(new File(CONSUMER_DIRECTORY));
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+
+ @Test
+ public void testOneTableRateLimit()
+ throws Exception {
+ String tableName = getTableName();
+ try {
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ long startTime = System.currentTimeMillis();
+ TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
+ addTableConfig(tableConfig);
+ for (int i = 0; i < 60; i++) {
+ if (!isTableLoaded(tableName)) {
+ Thread.sleep(1000L);
+ } else {
+ break;
+ }
+ }
+ PinotMeter realtimeRowConsumedMeter =
ServerMetrics.get().getMeteredValue(ServerMeter.REALTIME_ROWS_CONSUMED);
+ long startCount = getCurrentCountStarResult(tableName);
+ for (int i = 1; i <= 10; i++) {
+ Thread.sleep(1000L);
+ long currentCount = getCurrentCountStarResult(tableName);
+ double currentRate = (currentCount - startCount) / (double)
(System.currentTimeMillis() - startTime) * 1000;
+ LOGGER.info("Second = " + i + ", realtimeRowConsumedMeter = " +
realtimeRowConsumedMeter.oneMinuteRate()
+ + ", currentCount = " + currentCount + ", currentRate = " +
currentRate);
+ Assert.assertTrue(realtimeRowConsumedMeter.oneMinuteRate() <
SERVER_RATE_LIMIT,
+ "Rate should be less than " + SERVER_RATE_LIMIT);
+ Assert.assertTrue(currentRate < SERVER_RATE_LIMIT * 1.5, // Put some
leeway for the rate calculation
+ "Rate should be less than " + SERVER_RATE_LIMIT);
+ }
+ } finally {
+ dropRealtimeTable(tableName);
+
waitForTableDataManagerRemoved(TableNameBuilder.REALTIME.tableNameWithType(tableName));
+ }
+ }
+
+ @Test
+ public void testTwoTableRateLimit()
+ throws Exception {
+ String tableName1 = "testTable1";
+ String tableName2 = "testTable2";
+
+ try {
+ // Create and upload the schema and table config
+ Schema schema1 = createSchema();
+ schema1.setSchemaName("testTable1");
+ addSchema(schema1);
+ Schema schema2 = createSchema();
+ schema2.setSchemaName("testTable2");
+ addSchema(schema2);
+ long startTime = System.currentTimeMillis();
+
+ TableConfig tableConfig1 = createRealtimeTableConfig(tableName1);
+ addTableConfig(tableConfig1);
+ TableConfig tableConfig2 = createRealtimeTableConfig(tableName2);
+ addTableConfig(tableConfig2);
+ for (int i = 0; i < 60; i++) {
+ if (!isTableLoaded(tableName1) || !isTableLoaded(tableName2)) {
+ Thread.sleep(1000L);
+ } else {
+ break;
+ }
+ }
+
+ PinotMeter serverRowConsumedMeter =
ServerMetrics.get().getMeteredValue(ServerMeter.REALTIME_ROWS_CONSUMED);
+ long startCount1 = getCurrentCountStarResult(tableName1);
+ long startCount2 = getCurrentCountStarResult(tableName2);
+ for (int i = 1; i <= 10; i++) {
+ Thread.sleep(1000L);
+ long currentCount1 = getCurrentCountStarResult(tableName1);
+ long currentCount2 = getCurrentCountStarResult(tableName2);
+ long currentServerCount = currentCount1 + currentCount2;
+ long currentTimeMillis = System.currentTimeMillis();
+ double currentRate1 = (currentCount1 - startCount1) / (double)
(currentTimeMillis - startTime) * 1000;
+ double currentRate2 = (currentCount2 - startCount2) / (double)
(currentTimeMillis - startTime) * 1000;
+ double currentServerRate = currentRate1 + currentRate2;
+ LOGGER.info("Second = " + i + ", serverRowConsumedMeter = " +
serverRowConsumedMeter.oneMinuteRate()
+ + ", currentCount1 = " + currentCount1 + ", currentRate1 = " +
currentRate1
+ + ", currentCount2 = " + currentCount2 + ", currentRate2 = " +
currentRate2
+ + ", currentServerCount = " + currentServerCount + ",
currentServerRate = " + currentServerRate
+ );
+
+ Assert.assertTrue(serverRowConsumedMeter.oneMinuteRate() <
SERVER_RATE_LIMIT,
+ "Rate should be less than " + SERVER_RATE_LIMIT + ",
serverOneMinuteRate = " + serverRowConsumedMeter
+ .oneMinuteRate());
+ Assert.assertTrue(currentServerRate < SERVER_RATE_LIMIT * 1.5,
+ // Put some leeway for the rate calculation
+ "Whole table ingestion rate should be less than " +
SERVER_RATE_LIMIT + ", currentRate1 = " + currentRate1
+ + ", currentRate2 = " + currentRate2 + ", currentServerRate =
" + currentServerRate);
+ }
+ } finally {
+ dropRealtimeTable(tableName1);
+
waitForTableDataManagerRemoved(TableNameBuilder.REALTIME.tableNameWithType(tableName1));
+ dropRealtimeTable(tableName2);
+
waitForTableDataManagerRemoved(TableNameBuilder.REALTIME.tableNameWithType(tableName2));
+ }
+ }
+
+ protected TableConfig createRealtimeTableConfig() {
+ return createRealtimeTableConfig(getTableName());
+ }
+
+ protected TableConfig createRealtimeTableConfig(String tableName) {
+ return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
+
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
+
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
+
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
+
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
+
.setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant())
+
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig())
+
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
+ }
+
+ private boolean isTableLoaded(String tableName) {
+ try {
+ return getCurrentCountStarResult(tableName) > 0;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ @Override
+ protected Map<String, String> getStreamConfigs() {
+ return null;
+ }
+
+ @Test(enabled = false)
+ public void testDictionaryBasedQueries(boolean useMultiStageQueryEngine) {
+ // Do nothing
+ }
+
+ @Test(enabled = false)
+ public void testGeneratedQueries(boolean useMultiStageQueryEngine) {
+ // Do nothing
+ }
+
+ @Test(enabled = false)
+ public void testHardcodedQueries(boolean useMultiStageQueryEngine) {
+ // Do nothing
+ }
+
+ @Test(enabled = false)
+ public void testInstanceShutdown() {
+ // Do nothing
+ }
+
+ @Test(enabled = false)
+ public void testQueriesFromQueryFile(boolean useMultiStageQueryEngine) {
+ // Do nothing
+ }
+
+ @Test(enabled = false)
+ public void testQueryExceptions(boolean useMultiStageQueryEngine) {
+ // Do nothing
+ }
+
+ @Test(enabled = false)
+ public void testHardcodedServerPartitionedSqlQueries() {
+ // Do nothing
+ }
+}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 2b771707d2..10d9e6bec1 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -205,7 +205,6 @@ public abstract class BaseServerStarter implements
ServiceStartable {
ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
Server.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
-
// Set data table version send to broker.
int dataTableVersion =
_serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION,
DataTableBuilderFactory.DEFAULT_VERSION);
@@ -573,6 +572,10 @@ public abstract class BaseServerStarter implements
ServiceStartable {
ServerConf serverConf = new ServerConf(_serverConf);
_serverInstance = new ServerInstance(serverConf, _helixManager,
accessControlFactory);
ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
+
+ // Enable Server level realtime ingestion rate limier
+
RealtimeConsumptionRateManager.getInstance().createServerRateLimiter(_serverConf,
serverMetrics);
+
InstanceDataManager instanceDataManager =
_serverInstance.getInstanceDataManager();
instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() ->
_isServerReadyToServeQueries);
// initialize the thread accountant for query killing
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1ba40c9003..190de27151 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -619,6 +619,11 @@ public class CommonConstants {
// This is also the default in the case a user misconfigures this by
setting to <= 0.
public static final int DEFAULT_STARTUP_REALTIME_MIN_FRESHNESS_MS = 10000;
+ // Config for realtime consumption message rate limit
+ public static final String CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT =
"pinot.server.consumption.rate.limit";
+ // Default to 0.0 (no limit)
+ public static final double DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT = 0.0;
+
public static final String DEFAULT_READ_MODE = "mmap";
// Whether to reload consuming segment on scheme update
public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]