yashmayya commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2160710208
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java:
##########
@@ -1403,4 +1700,40 @@ private void waitForReloadToComplete(String reloadJobId,
long timeoutMs) {
}
}, 1000L, timeoutMs, "Failed to reload all segments");
}
+
+ private void waitForRebalanceToComplete(String rebalanceJobId, long
timeoutMs) {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ String requestUrl =
getControllerRequestURLBuilder().forTableRebalanceStatus(rebalanceJobId);
+ SimpleHttpResponse httpResponse =
+
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new
URL(requestUrl).toURI(), null));
+ ServerRebalanceJobStatusResponse rebalanceStatus =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
ServerRebalanceJobStatusResponse.class);
+ return rebalanceStatus.getTableRebalanceProgressStats().getStatus() ==
RebalanceResult.Status.DONE;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 1000L, timeoutMs, "Failed to complete rebalance");
+ }
+
+ private void waitForTableEVISConverge(String tableName, long timeoutMs) {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ String requestUrl =
getControllerRequestURLBuilder().forIdealState(tableName);
+ SimpleHttpResponse httpResponse =
+
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new
URL(requestUrl).toURI(), null));
+ TableViews.TableView idealState =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
TableViews.TableView.class);
+
+ requestUrl = getControllerRequestURLBuilder().forIdealState(tableName);
Review Comment:
Well, we were only checking if IS = EV in this method right? And IS would
always be equal to IS 😄
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java:
##########
@@ -1379,6 +1388,284 @@ public void testRebalanceJobZkMetadataCleanup()
_helixResourceManager.getControllerJobZKMetadata(inProgressJobId,
ControllerJobTypes.TABLE_REBALANCE));
}
+ @Test
+ public void testForceCommit()
+ throws Exception {
+ final String tenantA = "tenantA";
+ final String tenantB = "tenantB";
+
+ TableConfig tableConfig = getRealtimeTableConfig();
+
+ BaseServerStarter serverStarter0 = startOneServer(NUM_SERVERS);
+ BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS + 1);
+ createServerTenant(tenantA, 0, 2);
+
+ BaseServerStarter serverStarter2 = startOneServer(NUM_SERVERS + 2);
+ BaseServerStarter serverStarter3 = startOneServer(NUM_SERVERS + 3);
+ createServerTenant(tenantB, 0, 2);
+ try {
+ // Prepare the table to replicate segments across two servers on tenantA
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ tableConfig.getValidationConfig().setReplication("2");
+ updateTableConfig(tableConfig);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(false);
+ rebalanceConfig.setIncludeConsuming(true);
+
+ String response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ RebalanceResult rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ RebalanceSummaryResult summary =
rebalanceResult.getRebalanceSummaryResult();
+ assertEquals(
+
summary.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+
assertEquals(summary.getSegmentInfo().getConsumingSegmentToBeMovedSummary().getNumConsumingSegmentsToBeMoved(),
+ 4);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ // test: move segments from tenantA to tenantB
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ Set<String> originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap
consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ LLCSegmentName consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ LLCSegmentName consumingSegmentOriginal =
+ new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
+ // test: move segment from tenantB to tenantA with downtime
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setDowntime(true);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ assertFalse(originalConsumingSegmentsToMove.isEmpty());
+
+ waitForTableEVISConverge(getTableName(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ consumingSegmentOriginal =
+ new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
+ // test: move segment from tenantA to tenantB with includeConsuming =
false, consuming segment should not be
+ // committed
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setDowntime(false);
+ rebalanceConfig.setIncludeConsuming(false);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentOriginal = consumingSegmentNow;
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ // the sequence number should not increase since the consuming segment
is not committed
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber());
+
+ // Resume the table
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(),
getServerTenant(), null));
+ tableConfig.getValidationConfig().setReplication("1");
+ updateTableConfig(tableConfig);
+ rebalanceConfig.setForceCommit(false);
+ rebalanceConfig.setMinAvailableReplicas(0);
+ rebalanceConfig.setDowntime(false);
+ rebalanceConfig.setIncludeConsuming(true);
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+ } finally {
+ serverStarter0.stop();
+ serverStarter1.stop();
+ serverStarter2.stop();
+ serverStarter3.stop();
+ }
+ }
+
+ @Test
+ void testForceCommitStrictReplicaGroup()
+ throws Exception {
+ final String tenantA = "tenantA_strictRG";
+ final String tenantB = "tenantB_strictRG";
+
+ BaseServerStarter serverStarter0 = startOneServer(NUM_SERVERS);
+ BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS + 1);
+ createServerTenant(tenantA, 0, 2);
+
+ BaseServerStarter serverStarter2 = startOneServer(NUM_SERVERS + 2);
+ BaseServerStarter serverStarter3 = startOneServer(NUM_SERVERS + 3);
+ createServerTenant(tenantB, 0, 2);
+
+ try {
+ // Prepare the table to replicate segments across two servers on tenantA
+ TableConfig tableConfig = getRealtimeTableConfig();
+ tableConfig.setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
+ false));
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ tableConfig.getValidationConfig().setReplication("2");
+
+ updateTableConfig(tableConfig);
+
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(false);
+ rebalanceConfig.setIncludeConsuming(true);
+
+ String response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ RebalanceResult rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ RebalanceSummaryResult summary =
rebalanceResult.getRebalanceSummaryResult();
+ assertEquals(
+
summary.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+
assertEquals(summary.getSegmentInfo().getConsumingSegmentToBeMovedSummary().getNumConsumingSegmentsToBeMoved(),
+ 4);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ // test: move segments from tenantA to tenantB
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ Set<String> originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertFalse(originalConsumingSegmentsToMove.isEmpty());
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap
consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ LLCSegmentName consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ LLCSegmentName consumingSegmentOriginal =
+ new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
+ // test: move segment from tenantB to tenantA with batch size
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setBatchSizePerServer(1);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertFalse(originalConsumingSegmentsToMove.isEmpty());
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ consumingSegmentOriginal =
+ new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
+ // test: move segment from tenantA to tenantB with includeConsuming =
false, consuming segment should not be
+ // committed
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setDowntime(false);
+ rebalanceConfig.setIncludeConsuming(false);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentOriginal = consumingSegmentNow;
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ // the sequence number should not increase since the consuming segment
is not committed
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber());
+
+ // Resume the table
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(),
getServerTenant(), null));
+ tableConfig.getValidationConfig().setReplication("1");
+ updateTableConfig(tableConfig);
+ rebalanceConfig.setForceCommit(false);
+ rebalanceConfig.setMinAvailableReplicas(0);
+ rebalanceConfig.setDowntime(false);
+ rebalanceConfig.setIncludeConsuming(true);
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
Review Comment:
IMO this should be in the finally block as well, since the other tests
probably won't pass with the table in an unexpected tenant.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java:
##########
@@ -1379,6 +1388,294 @@ public void testRebalanceJobZkMetadataCleanup()
_helixResourceManager.getControllerJobZKMetadata(inProgressJobId,
ControllerJobTypes.TABLE_REBALANCE));
}
+ @Test
+ public void testForceCommit()
+ throws Exception {
+ final String tenantA = "tenantA";
+ final String tenantB = "tenantB";
+
+ TableConfig tableConfig = getRealtimeTableConfig();
+
+ BaseServerStarter serverStarter0 = startOneServer(NUM_SERVERS);
+ BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS + 1);
+ createServerTenant(tenantA, 0, 2);
+
+ BaseServerStarter serverStarter2 = startOneServer(NUM_SERVERS + 2);
+ BaseServerStarter serverStarter3 = startOneServer(NUM_SERVERS + 3);
+ createServerTenant(tenantB, 0, 2);
+
+ // Prepare the table to replicate segments across two servers on tenantA
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ tableConfig.getValidationConfig().setReplication("2");
+ updateTableConfig(tableConfig);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(false);
+ rebalanceConfig.setIncludeConsuming(true);
+
+ String response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ RebalanceResult rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ RebalanceSummaryResult summary =
rebalanceResult.getRebalanceSummaryResult();
+ assertEquals(
+
summary.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ Set<String> originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertEquals(originalConsumingSegmentsToMove.size(), 2);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ // test: move segments from tenantA to tenantB
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap
consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ LLCSegmentName consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ LLCSegmentName consumingSegmentOriginal =
+ new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // test: move segment from tenantB to tenantA with downtime
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setDowntime(true);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+
+ waitForTableEVISConverge(getTableName(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ consumingSegmentOriginal = new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // test: move segment from tenantA to tenantB with includeConsuming =
false, consuming segment should not be
+ // committed
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setDowntime(false);
+ rebalanceConfig.setIncludeConsuming(false);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentOriginal = consumingSegmentNow;
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ // the sequence number should not increase since the consuming segment is
not committed
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber());
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // Resume the table
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(),
getServerTenant(), null));
+ tableConfig.getValidationConfig().setReplication("1");
+ updateTableConfig(tableConfig);
+ rebalanceConfig.setForceCommit(false);
+ rebalanceConfig.setMinAvailableReplicas(0);
+ rebalanceConfig.setDowntime(false);
+ rebalanceConfig.setIncludeConsuming(true);
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ serverStarter0.stop();
+ serverStarter1.stop();
+ serverStarter2.stop();
+ serverStarter3.stop();
+ }
+
+ @Test
+ void testForceCommitStrictReplicaGroup()
Review Comment:
Yeah but it seems like most of the code is common, so I think it'd be good
to refactor it to use [parameters](https://testng.org/parameters.html) and
share the common code. It's okay for the different parts to simply be in an
if-else block based on the parameter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]