This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c418442c49 Fix flakiness of ControllerPeriodicTasksIntegrationTest
(#13337)
c418442c49 is described below
commit c418442c498865c35b5290ecc41f38743b3f0bcd
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Jun 7 12:52:42 2024 -0700
Fix flakiness of ControllerPeriodicTasksIntegrationTest (#13337)
---
.../ControllerPeriodicTasksIntegrationTest.java | 122 +++++++++++----------
1 file changed, 63 insertions(+), 59 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
index 03a2b6a000..9e58028146 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -62,10 +63,10 @@ import static org.testng.Assert.assertTrue;
* The intention of these tests is not to test functionality of daemons, but
simply to check that they run as expected
* and process the tables when the controller starts.
*/
+// TODO: Add tests for other ControllerPeriodicTasks (RetentionManager,
RealtimeSegmentValidationManager).
public class ControllerPeriodicTasksIntegrationTest extends
BaseClusterIntegrationTestSet {
private static final int PERIODIC_TASK_INITIAL_DELAY_SECONDS = 30;
- private static final int PERIODIC_TASK_FREQUENCY_SECONDS = 5;
- private static final String PERIODIC_TASK_FREQUENCY = "5s";
+ private static final String PERIODIC_TASK_FREQUENCY_PERIOD = "5s";
private static final String PERIODIC_TASK_WAIT_FOR_PUSH_TIME_PERIOD = "5s";
private static final int NUM_REPLICAS = 2;
@@ -115,22 +116,20 @@ public class ControllerPeriodicTasksIntegrationTest
extends BaseClusterIntegrati
Map<String, Object> properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
- properties
-
.put(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS,
PERIODIC_TASK_INITIAL_DELAY_SECONDS);
-
properties.put(ControllerPeriodicTasksConf.DEPRECATED_STATUS_CHECKER_FREQUENCY_IN_SECONDS,
- PERIODIC_TASK_FREQUENCY_SECONDS);
-
properties.put(ControllerPeriodicTasksConf.DEPRECATED_REALTIME_SEGMENT_RELOCATION_INITIAL_DELAY_IN_SECONDS,
+
properties.put(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS,
PERIODIC_TASK_INITIAL_DELAY_SECONDS);
- properties
-
.put(ControllerPeriodicTasksConf.DEPRECATED_REALTIME_SEGMENT_RELOCATOR_FREQUENCY,
PERIODIC_TASK_FREQUENCY);
+
properties.put(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_PERIOD,
PERIODIC_TASK_FREQUENCY_PERIOD);
+
properties.put(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS,
+ PERIODIC_TASK_INITIAL_DELAY_SECONDS);
+
properties.put(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_FREQUENCY_PERIOD,
PERIODIC_TASK_FREQUENCY_PERIOD);
properties.put(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_INITIAL_DELAY_IN_SECONDS,
PERIODIC_TASK_INITIAL_DELAY_SECONDS);
-
properties.put(ControllerPeriodicTasksConf.DEPRECATED_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS,
- PERIODIC_TASK_FREQUENCY_SECONDS);
+
properties.put(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_PERIOD,
+ PERIODIC_TASK_FREQUENCY_PERIOD);
properties.put(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_INITIAL_DELAY_IN_SECONDS,
PERIODIC_TASK_INITIAL_DELAY_SECONDS);
-
properties.put(ControllerPeriodicTasksConf.DEPRECATED_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS,
- PERIODIC_TASK_FREQUENCY_SECONDS);
+
properties.put(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_PERIOD,
+ PERIODIC_TASK_FREQUENCY_PERIOD);
properties.put(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_PERIOD,
PERIODIC_TASK_WAIT_FOR_PUSH_TIME_PERIOD);
@@ -160,8 +159,8 @@ public class ControllerPeriodicTasksIntegrationTest extends
BaseClusterIntegrati
addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
// Create and upload segments
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema,
0, _segmentDir, _tarDir);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles,
offlineTableConfig, schema, 0, _segmentDir,
+ _tarDir);
uploadSegments(getTableName(), _tarDir);
// Push data into Kafka
@@ -228,44 +227,40 @@ public class ControllerPeriodicTasksIntegrationTest
extends BaseClusterIntegrati
_currentTable = DEFAULT_TABLE_NAME;
int numTables = 6;
- ControllerMetrics controllerMetrics =
_controllerStarter.getControllerMetrics();
TestUtils.waitForCondition(aVoid -> {
- if (MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
"SegmentStatusChecker",
- ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED) != numTables) {
+ if
(!checkGlobalGaugeValue(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
"SegmentStatusChecker",
+ numTables)) {
return false;
}
- if (!checkSegmentStatusCheckerMetrics(controllerMetrics,
TableNameBuilder.OFFLINE.tableNameWithType(emptyTable),
- null, NUM_REPLICAS, 100, 0, 100)) {
+ if
(!checkSegmentStatusCheckerMetrics(TableNameBuilder.OFFLINE.tableNameWithType(emptyTable),
null, NUM_REPLICAS,
+ 100, 0, 100)) {
return false;
}
- if (!checkSegmentStatusCheckerMetrics(controllerMetrics,
- TableNameBuilder.OFFLINE.tableNameWithType(disabledTable), null, 0,
0, 0, 0)) {
+ if
(!checkSegmentStatusCheckerMetrics(TableNameBuilder.OFFLINE.tableNameWithType(disabledTable),
null, 0, 0, 0,
+ 0)) {
return false;
}
String tableNameWithType =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
IdealState idealState =
_helixResourceManager.getTableIdealState(tableNameWithType);
- if (!checkSegmentStatusCheckerMetrics(controllerMetrics,
tableNameWithType, idealState, NUM_REPLICAS, 100, 0,
- 100)) {
+ if (!checkSegmentStatusCheckerMetrics(tableNameWithType, idealState,
NUM_REPLICAS, 100, 0, 100)) {
return false;
}
tableNameWithType =
TableNameBuilder.OFFLINE.tableNameWithType(tableWithOfflineSegment);
idealState = _helixResourceManager.getTableIdealState(tableNameWithType);
//noinspection PointlessArithmeticExpression
- if (!checkSegmentStatusCheckerMetrics(controllerMetrics,
tableNameWithType, idealState, NUM_REPLICAS - 1,
+ if (!checkSegmentStatusCheckerMetrics(tableNameWithType, idealState,
NUM_REPLICAS - 1,
100 * (NUM_REPLICAS - 1) / NUM_REPLICAS, 0, 100)) {
return false;
}
tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
idealState = _helixResourceManager.getTableIdealState(tableNameWithType);
- if (!checkSegmentStatusCheckerMetrics(controllerMetrics,
tableNameWithType, idealState, NUM_REPLICAS, 100, 0,
- 100)) {
+ if (!checkSegmentStatusCheckerMetrics(tableNameWithType, idealState,
NUM_REPLICAS, 100, 0, 100)) {
return false;
}
- return MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
ControllerGauge.OFFLINE_TABLE_COUNT) == 4
- && MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
ControllerGauge.REALTIME_TABLE_COUNT) == 2
- && MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
ControllerGauge.DISABLED_TABLE_COUNT) == 1
- && MetricValueUtils.getGlobalGaugeValue(controllerMetrics,
ControllerGauge.UPSERT_TABLE_COUNT) == 1;
- }, 60_000, "Timed out waiting for SegmentStatusChecker");
+ return checkGlobalGaugeValue(ControllerGauge.OFFLINE_TABLE_COUNT, 4) &&
checkGlobalGaugeValue(
+ ControllerGauge.REALTIME_TABLE_COUNT, 2) &&
checkGlobalGaugeValue(ControllerGauge.DISABLED_TABLE_COUNT, 1)
+ && checkGlobalGaugeValue(ControllerGauge.UPSERT_TABLE_COUNT, 1);
+ }, 600_000, "Timed out waiting for SegmentStatusChecker");
dropOfflineTable(emptyTable);
dropOfflineTable(disabledTable);
@@ -288,31 +283,40 @@ public class ControllerPeriodicTasksIntegrationTest
extends BaseClusterIntegrati
addTableConfig(tableConfig);
}
- private boolean checkSegmentStatusCheckerMetrics(ControllerMetrics
controllerMetrics, String tableNameWithType,
- IdealState idealState, long expectedNumReplicas, long
expectedPercentReplicas, long expectedSegmentsInErrorState,
+ private boolean checkGlobalGaugeValue(ControllerGauge gauge, long
expectedValue) {
+ return MetricValueUtils.getGlobalGaugeValue(ControllerMetrics.get(),
gauge) == expectedValue;
+ }
+
+ private boolean checkGlobalGaugeValue(ControllerGauge gauge, String key,
long expectedValue) {
+ return MetricValueUtils.getGlobalGaugeValue(ControllerMetrics.get(), key,
gauge) == expectedValue;
+ }
+
+ private boolean checkTableGaugeValue(ControllerGauge gauge, String
tableNameWithType, long expectedValue) {
+ return MetricValueUtils.getTableGaugeValue(ControllerMetrics.get(),
tableNameWithType, gauge) == expectedValue;
+ }
+
+ private boolean checkSegmentStatusCheckerMetrics(String tableNameWithType,
@Nullable IdealState idealState,
+ long expectedNumReplicas, long expectedPercentReplicas, long
expectedSegmentsInErrorState,
long expectedPercentSegmentsAvailable) {
if (idealState != null) {
- if (MetricValueUtils.getTableGaugeValue(controllerMetrics,
tableNameWithType,
- ControllerGauge.IDEALSTATE_ZNODE_SIZE) !=
idealState.toString().length()) {
+ if (!checkTableGaugeValue(ControllerGauge.IDEALSTATE_ZNODE_SIZE,
tableNameWithType,
+ idealState.toString().length())) {
return false;
}
- if (MetricValueUtils.getTableGaugeValue(controllerMetrics,
tableNameWithType, ControllerGauge.SEGMENT_COUNT)
- != idealState.getPartitionSet().size()) {
+ if (!checkTableGaugeValue(ControllerGauge.SEGMENT_COUNT,
tableNameWithType,
+ idealState.getPartitionSet().size())) {
return false;
}
}
- return MetricValueUtils.getTableGaugeValue(controllerMetrics,
tableNameWithType,
- ControllerGauge.NUMBER_OF_REPLICAS) == expectedNumReplicas
- && MetricValueUtils.getTableGaugeValue(controllerMetrics,
tableNameWithType,
- ControllerGauge.PERCENT_OF_REPLICAS) == expectedPercentReplicas
- && MetricValueUtils.getTableGaugeValue(controllerMetrics,
tableNameWithType,
- ControllerGauge.SEGMENTS_IN_ERROR_STATE) ==
expectedSegmentsInErrorState
- && MetricValueUtils.getTableGaugeValue(controllerMetrics,
tableNameWithType,
- ControllerGauge.PERCENT_SEGMENTS_AVAILABLE) ==
expectedPercentSegmentsAvailable;
+ return checkTableGaugeValue(ControllerGauge.NUMBER_OF_REPLICAS,
tableNameWithType, expectedNumReplicas)
+ && checkTableGaugeValue(ControllerGauge.PERCENT_OF_REPLICAS,
tableNameWithType, expectedPercentReplicas)
+ && checkTableGaugeValue(ControllerGauge.SEGMENTS_IN_ERROR_STATE,
tableNameWithType,
+ expectedSegmentsInErrorState) &&
checkTableGaugeValue(ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
+ tableNameWithType, expectedPercentSegmentsAvailable);
}
@Test
- public void testRealtimeSegmentRelocator()
+ public void testSegmentRelocator()
throws Exception {
// Add relocation tenant config
TableConfig realtimeTableConfig = getRealtimeTableConfig();
@@ -339,7 +343,7 @@ public class ControllerPeriodicTasksIntegrationTest extends
BaseClusterIntegrati
}
}
return Collections.disjoint(consumingServers, completedServers);
- }, 60_000, "Timed out waiting for RealtimeSegmentRelocation");
+ }, 600_000, "Timed out waiting for SegmentRelocator");
}
@Test
@@ -358,7 +362,7 @@ public class ControllerPeriodicTasksIntegrationTest extends
BaseClusterIntegrati
IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin,
helixClusterName);
assertNotNull(idealState);
return
idealState.getInstanceSet(tableNameWithType).equals(brokersAfterAdd);
- }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager");
+ }, 600_000L, "Timeout when waiting for BrokerResourceValidationManager");
// Drop the new added broker
_helixAdmin.dropInstance(helixClusterName, instanceConfig);
@@ -369,7 +373,7 @@ public class ControllerPeriodicTasksIntegrationTest extends
BaseClusterIntegrati
IdealState idealState = HelixHelper.getBrokerIdealStates(_helixAdmin,
helixClusterName);
assertNotNull(idealState);
return
idealState.getInstanceSet(tableNameWithType).equals(brokersAfterDrop);
- }, 60_000L, "Timeout when waiting for BrokerResourceValidationManager");
+ }, 600_000L, "Timeout when waiting for BrokerResourceValidationManager");
}
@Test
@@ -380,15 +384,15 @@ public class ControllerPeriodicTasksIntegrationTest
extends BaseClusterIntegrati
// Wait until OfflineSegmentIntervalChecker gets executed
TestUtils.waitForCondition(aVoid -> {
- long numSegments =
-
validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType,
"SegmentCount"));
- long numMissingSegments =
-
validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType,
"missingSegmentCount"));
- long numTotalDocs =
-
validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType,
"TotalDocumentCount"));
- return numSegments == NUM_OFFLINE_AVRO_FILES && numMissingSegments == 0
&& numTotalDocs == 79003;
- }, 60_000, "Timed out waiting for OfflineSegmentIntervalChecker");
+ return checkValidationGaugeValue(validationMetrics, tableNameWithType,
"SegmentCount", NUM_OFFLINE_AVRO_FILES)
+ && checkValidationGaugeValue(validationMetrics, tableNameWithType,
"missingSegmentCount", 0)
+ && checkValidationGaugeValue(validationMetrics, tableNameWithType,
"TotalDocumentCount", 79003);
+ }, 600_000, "Timed out waiting for OfflineSegmentIntervalChecker");
}
- // TODO: tests for other ControllerPeriodicTasks (RetentionManager,
RealtimeSegmentValidationManager)
+ private boolean checkValidationGaugeValue(ValidationMetrics
validationMetrics, String tableNameWithType,
+ String gaugeName, long expectedValue) {
+ return
validationMetrics.getValueOfGauge(ValidationMetrics.makeGaugeName(tableNameWithType,
gaugeName))
+ == expectedValue;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]