yashmayya commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2156422714
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -673,6 +673,14 @@ public RebalanceResult rebalance(
@QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
@ApiParam(value = "Whether to update segment target tier as part of the
rebalance") @DefaultValue("false")
@QueryParam("updateTargetTier") boolean updateTargetTier,
+ @ApiParam(value = "Do force commit on consuming segments before they are
rebalanced") @DefaultValue("false")
Review Comment:
```suggestion
@ApiParam(value = "Whether to force commit consuming segments for a
REALTIME table before they are rebalanced.") @DefaultValue("false")
```
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java:
##########
@@ -1403,4 +1700,40 @@ private void waitForReloadToComplete(String reloadJobId,
long timeoutMs) {
}
}, 1000L, timeoutMs, "Failed to reload all segments");
}
+
+ private void waitForRebalanceToComplete(String rebalanceJobId, long
timeoutMs) {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ String requestUrl =
getControllerRequestURLBuilder().forTableRebalanceStatus(rebalanceJobId);
+ SimpleHttpResponse httpResponse =
+
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new
URL(requestUrl).toURI(), null));
+ ServerRebalanceJobStatusResponse rebalanceStatus =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
ServerRebalanceJobStatusResponse.class);
+ return rebalanceStatus.getTableRebalanceProgressStats().getStatus() ==
RebalanceResult.Status.DONE;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 1000L, timeoutMs, "Failed to complete rebalance");
+ }
+
+ private void waitForTableEVISConverge(String tableName, long timeoutMs) {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ String requestUrl =
getControllerRequestURLBuilder().forIdealState(tableName);
+ SimpleHttpResponse httpResponse =
+
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new
URL(requestUrl).toURI(), null));
+ TableViews.TableView idealState =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
TableViews.TableView.class);
+
+ requestUrl = getControllerRequestURLBuilder().forIdealState(tableName);
+ httpResponse =
+
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new
URL(requestUrl).toURI(), null));
+ TableViews.TableView externalView =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
TableViews.TableView.class);
+ return idealState._realtime.equals(externalView._realtime) &&
idealState._offline.equals(externalView._offline);
+ } catch (Exception e) {
+ return null;
+ }
Review Comment:
Same comment as above.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java:
##########
@@ -1379,6 +1388,294 @@ public void testRebalanceJobZkMetadataCleanup()
_helixResourceManager.getControllerJobZKMetadata(inProgressJobId,
ControllerJobTypes.TABLE_REBALANCE));
}
+ @Test
+ public void testForceCommit()
+ throws Exception {
+ final String tenantA = "tenantA";
+ final String tenantB = "tenantB";
+
+ TableConfig tableConfig = getRealtimeTableConfig();
+
+ BaseServerStarter serverStarter0 = startOneServer(NUM_SERVERS);
+ BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS + 1);
+ createServerTenant(tenantA, 0, 2);
+
+ BaseServerStarter serverStarter2 = startOneServer(NUM_SERVERS + 2);
+ BaseServerStarter serverStarter3 = startOneServer(NUM_SERVERS + 3);
+ createServerTenant(tenantB, 0, 2);
+
+ // Prepare the table to replicate segments across two servers on tenantA
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ tableConfig.getValidationConfig().setReplication("2");
+ updateTableConfig(tableConfig);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(false);
+ rebalanceConfig.setIncludeConsuming(true);
+
+ String response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ RebalanceResult rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ RebalanceSummaryResult summary =
rebalanceResult.getRebalanceSummaryResult();
+ assertEquals(
+
summary.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ Set<String> originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertEquals(originalConsumingSegmentsToMove.size(), 2);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ // test: move segments from tenantA to tenantB
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ originalConsumingSegmentsToMove = summary.getSegmentInfo()
Review Comment:
Was there supposed to be another assertion on
`originalConsumingSegmentsToMove` here?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -673,6 +673,14 @@ public RebalanceResult rebalance(
@QueryParam("retryInitialDelayInMs") long retryInitialDelayInMs,
@ApiParam(value = "Whether to update segment target tier as part of the
rebalance") @DefaultValue("false")
@QueryParam("updateTargetTier") boolean updateTargetTier,
+ @ApiParam(value = "Do force commit on consuming segments before they are
rebalanced") @DefaultValue("false")
+ @QueryParam("forceCommit") boolean forceCommit,
+ @ApiParam(value = "Batch size for force commit operations")
@DefaultValue("2147483647")
+ @QueryParam("forceCommitBatchSize") int forceCommitBatchSize,
+ @ApiParam(value = "Interval in milliseconds for checking force commit
batch status") @DefaultValue("5000")
+ @QueryParam("forceCommitBatchStatusCheckIntervalMs") int
forceCommitBatchStatusCheckIntervalMs,
+ @ApiParam(value = "Timeout in milliseconds for force commit batch status
check") @DefaultValue("180000")
Review Comment:
Let's extract these default values as constants and share it with the force
commit API -
https://github.com/apache/pinot/blob/60f1897d95024db29b820fdd87c1962809f2e9fb/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java#L171
Also, the `forceCommit` API is using values defined in seconds for the
status check interval and timeout - are we using `ms` here for consistency with
the other rebalance params? Even in that case, let's use the common constants *
1000.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java:
##########
@@ -1403,4 +1700,40 @@ private void waitForReloadToComplete(String reloadJobId,
long timeoutMs) {
}
}, 1000L, timeoutMs, "Failed to reload all segments");
}
+
+ private void waitForRebalanceToComplete(String rebalanceJobId, long
timeoutMs) {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ String requestUrl =
getControllerRequestURLBuilder().forTableRebalanceStatus(rebalanceJobId);
+ SimpleHttpResponse httpResponse =
+
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new
URL(requestUrl).toURI(), null));
+ ServerRebalanceJobStatusResponse rebalanceStatus =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
ServerRebalanceJobStatusResponse.class);
+ return rebalanceStatus.getTableRebalanceProgressStats().getStatus() ==
RebalanceResult.Status.DONE;
+ } catch (Exception e) {
+ return null;
+ }
Review Comment:
Why are we swallowing the exception here? Shouldn't we fail the test on an
exception here (like `TestUtils.waitForCondition` does)? Also we should at
least log the exception to make debugging unexpected failures easier.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java:
##########
@@ -136,6 +136,22 @@ public class RebalanceConfig {
@ApiModelProperty(example = "300000")
private long _retryInitialDelayInMs = 300000L;
+ @JsonProperty("forceCommit")
+ @ApiModelProperty(example = "false")
+ private boolean _forceCommit = false;
+
+ @JsonProperty("forceCommitBatchSize")
+ @ApiModelProperty(example = "2147483647")
+ private int _forceCommitBatchSize = Integer.MAX_VALUE;
+
+ @JsonProperty("forceCommitBatchStatusCheckIntervalMs")
+ @ApiModelProperty(example = "5000")
+ private int _forceCommitBatchStatusCheckIntervalMs = 5000;
+
+ @JsonProperty("forceCommitBatchStatusCheckTimeoutMs")
+ @ApiModelProperty(example = "180000")
+ private int _forceCommitBatchStatusCheckTimeoutMs = 180000;
Review Comment:
Same comment as above - these should be using common shared constants.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java:
##########
@@ -1379,6 +1388,294 @@ public void testRebalanceJobZkMetadataCleanup()
_helixResourceManager.getControllerJobZKMetadata(inProgressJobId,
ControllerJobTypes.TABLE_REBALANCE));
}
+ @Test
+ public void testForceCommit()
+ throws Exception {
+ final String tenantA = "tenantA";
+ final String tenantB = "tenantB";
+
+ TableConfig tableConfig = getRealtimeTableConfig();
+
+ BaseServerStarter serverStarter0 = startOneServer(NUM_SERVERS);
+ BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS + 1);
+ createServerTenant(tenantA, 0, 2);
+
+ BaseServerStarter serverStarter2 = startOneServer(NUM_SERVERS + 2);
+ BaseServerStarter serverStarter3 = startOneServer(NUM_SERVERS + 3);
+ createServerTenant(tenantB, 0, 2);
+
+ // Prepare the table to replicate segments across two servers on tenantA
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ tableConfig.getValidationConfig().setReplication("2");
+ updateTableConfig(tableConfig);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(false);
+ rebalanceConfig.setIncludeConsuming(true);
+
+ String response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ RebalanceResult rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ RebalanceSummaryResult summary =
rebalanceResult.getRebalanceSummaryResult();
+ assertEquals(
+
summary.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ Set<String> originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertEquals(originalConsumingSegmentsToMove.size(), 2);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ // test: move segments from tenantA to tenantB
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap
consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ LLCSegmentName consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ LLCSegmentName consumingSegmentOriginal =
+ new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // test: move segment from tenantB to tenantA with downtime
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setDowntime(true);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+
+ waitForTableEVISConverge(getTableName(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ consumingSegmentOriginal = new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // test: move segment from tenantA to tenantB with includeConsuming =
false, consuming segment should not be
+ // committed
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setDowntime(false);
+ rebalanceConfig.setIncludeConsuming(false);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentOriginal = consumingSegmentNow;
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ // the sequence number should not increase since the consuming segment is
not committed
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber());
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // Resume the table
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(),
getServerTenant(), null));
+ tableConfig.getValidationConfig().setReplication("1");
+ updateTableConfig(tableConfig);
+ rebalanceConfig.setForceCommit(false);
+ rebalanceConfig.setMinAvailableReplicas(0);
+ rebalanceConfig.setDowntime(false);
+ rebalanceConfig.setIncludeConsuming(true);
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ serverStarter0.stop();
+ serverStarter1.stop();
+ serverStarter2.stop();
+ serverStarter3.stop();
Review Comment:
Can this be put in a try-finally block after the original table config
update? If there's any failure in this test, it shouldn't affect the remaining
tests in the suite.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java:
##########
@@ -1403,4 +1700,40 @@ private void waitForReloadToComplete(String reloadJobId,
long timeoutMs) {
}
}, 1000L, timeoutMs, "Failed to reload all segments");
}
+
+ private void waitForRebalanceToComplete(String rebalanceJobId, long
timeoutMs) {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ String requestUrl =
getControllerRequestURLBuilder().forTableRebalanceStatus(rebalanceJobId);
+ SimpleHttpResponse httpResponse =
+
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new
URL(requestUrl).toURI(), null));
+ ServerRebalanceJobStatusResponse rebalanceStatus =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
ServerRebalanceJobStatusResponse.class);
+ return rebalanceStatus.getTableRebalanceProgressStats().getStatus() ==
RebalanceResult.Status.DONE;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 1000L, timeoutMs, "Failed to complete rebalance");
+ }
+
+ private void waitForTableEVISConverge(String tableName, long timeoutMs) {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ String requestUrl =
getControllerRequestURLBuilder().forIdealState(tableName);
+ SimpleHttpResponse httpResponse =
+
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new
URL(requestUrl).toURI(), null));
+ TableViews.TableView idealState =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
TableViews.TableView.class);
+
+ requestUrl = getControllerRequestURLBuilder().forIdealState(tableName);
Review Comment:
Should this be `forExternalView`?
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java:
##########
@@ -1379,6 +1388,294 @@ public void testRebalanceJobZkMetadataCleanup()
_helixResourceManager.getControllerJobZKMetadata(inProgressJobId,
ControllerJobTypes.TABLE_REBALANCE));
}
+ @Test
+ public void testForceCommit()
+ throws Exception {
+ final String tenantA = "tenantA";
+ final String tenantB = "tenantB";
+
+ TableConfig tableConfig = getRealtimeTableConfig();
+
+ BaseServerStarter serverStarter0 = startOneServer(NUM_SERVERS);
+ BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS + 1);
+ createServerTenant(tenantA, 0, 2);
+
+ BaseServerStarter serverStarter2 = startOneServer(NUM_SERVERS + 2);
+ BaseServerStarter serverStarter3 = startOneServer(NUM_SERVERS + 3);
+ createServerTenant(tenantB, 0, 2);
+
+ // Prepare the table to replicate segments across two servers on tenantA
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ tableConfig.getValidationConfig().setReplication("2");
+ updateTableConfig(tableConfig);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(false);
+ rebalanceConfig.setIncludeConsuming(true);
+
+ String response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ RebalanceResult rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ RebalanceSummaryResult summary =
rebalanceResult.getRebalanceSummaryResult();
+ assertEquals(
+
summary.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ Set<String> originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertEquals(originalConsumingSegmentsToMove.size(), 2);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ // test: move segments from tenantA to tenantB
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap
consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ LLCSegmentName consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ LLCSegmentName consumingSegmentOriginal =
+ new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // test: move segment from tenantB to tenantA with downtime
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setDowntime(true);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+
+ waitForTableEVISConverge(getTableName(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ consumingSegmentOriginal = new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // test: move segment from tenantA to tenantB with includeConsuming =
false, consuming segment should not be
+ // committed
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setDowntime(false);
+ rebalanceConfig.setIncludeConsuming(false);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentOriginal = consumingSegmentNow;
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ // the sequence number should not increase since the consuming segment is
not committed
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber());
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // Resume the table
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(),
getServerTenant(), null));
+ tableConfig.getValidationConfig().setReplication("1");
+ updateTableConfig(tableConfig);
+ rebalanceConfig.setForceCommit(false);
+ rebalanceConfig.setMinAvailableReplicas(0);
+ rebalanceConfig.setDowntime(false);
+ rebalanceConfig.setIncludeConsuming(true);
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ serverStarter0.stop();
+ serverStarter1.stop();
+ serverStarter2.stop();
+ serverStarter3.stop();
+ }
+
+ @Test
+ void testForceCommitStrictReplicaGroup()
Review Comment:
This looks nearly identical to the above test - can it be refactored into a
parameterized test?
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancePauselessIntegrationTest.java:
##########
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.net.URL;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.ControllerConf;
+import
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
+import org.apache.pinot.controller.api.resources.TableViews;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.spi.config.table.RoutingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TableRebalancePauselessIntegrationTest extends
BasePauselessRealtimeIngestionTest {
+ @Override
+ protected String getFailurePoint() {
+ return null; // No failure point for basic test
+ }
+
+ @Override
+ protected int getExpectedSegmentsWithFailure() {
+ return NUM_REALTIME_SEGMENTS; // Always expect full segments
+ }
+
+ @Override
+ protected int getExpectedZKMetadataWithFailure() {
+ return NUM_REALTIME_SEGMENTS; // Always expect full metadata
+ }
+
+ @Override
+ protected long getCountStarResultWithFailure() {
+ return DEFAULT_COUNT_STAR_RESULT; // Always expect full count
+ }
+
+ @Override
+ protected void overrideControllerConf(Map<String, Object> properties) {
+ properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
+ }
+
+ @Override
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+ createServerTenant(getServerTenant(), 0, 1);
+ createBrokerTenant(getBrokerTenant(), 1);
+ setMaxSegmentCompletionTimeMillis();
+ setupNonPauselessTable();
+ injectFailure();
+ setupPauselessTable();
+ waitForAllDocsLoaded(600_000L); // Disable the pre-checks for the
rebalance job
+ }
+
+ private static String getQueryString(RebalanceConfig rebalanceConfig) {
+ return "dryRun=" + rebalanceConfig.isDryRun() + "&preChecks=" +
rebalanceConfig.isPreChecks()
+ + "&reassignInstances=" + rebalanceConfig.isReassignInstances()
+ + "&includeConsuming=" + rebalanceConfig.isIncludeConsuming()
+ + "&minimizeDataMovement=" + rebalanceConfig.getMinimizeDataMovement()
+ + "&bootstrap=" + rebalanceConfig.isBootstrap() + "&downtime=" +
rebalanceConfig.isDowntime()
+ + "&minAvailableReplicas=" + rebalanceConfig.getMinAvailableReplicas()
+ + "&bestEfforts=" + rebalanceConfig.isBestEfforts()
+ + "&batchSizePerServer=" + rebalanceConfig.getBatchSizePerServer()
+ + "&externalViewCheckIntervalInMs=" +
rebalanceConfig.getExternalViewCheckIntervalInMs()
+ + "&externalViewStabilizationTimeoutInMs=" +
rebalanceConfig.getExternalViewStabilizationTimeoutInMs()
+ + "&updateTargetTier=" + rebalanceConfig.isUpdateTargetTier()
+ + "&heartbeatIntervalInMs=" +
rebalanceConfig.getHeartbeatIntervalInMs()
+ + "&heartbeatTimeoutInMs=" + rebalanceConfig.getHeartbeatTimeoutInMs()
+ + "&maxAttempts=" + rebalanceConfig.getMaxAttempts()
+ + "&retryInitialDelayInMs=" +
rebalanceConfig.getRetryInitialDelayInMs()
+ + "&forceCommit=" + rebalanceConfig.isForceCommit();
+ }
+
+ private String getRebalanceUrl(RebalanceConfig rebalanceConfig, TableType
tableType) {
+ return StringUtil.join("/", getControllerRequestURLBuilder().getBaseUrl(),
"tables", getTableName(), "rebalance")
+ + "?type=" + tableType.toString() + "&" +
getQueryString(rebalanceConfig);
+ }
+
+ @Test
+ public void testForceCommit()
+ throws Exception {
+ final String tenantA = "tenantA";
+ final String tenantB = "tenantB";
+
+ TableConfig tableConfig = getRealtimeTableConfig();
+
+ BaseServerStarter serverStarter0 = startOneServer(0);
+ BaseServerStarter serverStarter1 = startOneServer(1);
+ createServerTenant(tenantA, 0, 2);
+
+ BaseServerStarter serverStarter2 = startOneServer(2);
+ BaseServerStarter serverStarter3 = startOneServer(3);
+ createServerTenant(tenantB, 0, 2);
+
+ // Prepare the table to replicate segments across two servers on tenantA
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ tableConfig.getValidationConfig().setReplication("2");
+ tableConfig.getValidationConfig().setPeerSegmentDownloadScheme("http");
+ updateTableConfig(tableConfig);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(false);
+ rebalanceConfig.setIncludeConsuming(true);
+
+ String response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ RebalanceResult rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ RebalanceSummaryResult summary =
rebalanceResult.getRebalanceSummaryResult();
+ assertEquals(
+
summary.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ Set<String> originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertEquals(originalConsumingSegmentsToMove.size(), 2);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ // test: move segments from tenantA to tenantB
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap
consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ LLCSegmentName consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ LLCSegmentName consumingSegmentOriginal =
+ new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // test: move segment from tenantB to tenantA with downtime
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setIncludeConsuming(false);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 60000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentOriginal = consumingSegmentNow;
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ // the sequence number should not increase since the consuming segment is
not committed
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber());
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // Resume the table
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(),
getServerTenant(), null));
+ tableConfig.getValidationConfig().setReplication("1");
+ tableConfig.getValidationConfig().setPeerSegmentDownloadScheme(null);
+ updateTableConfig(tableConfig);
+ rebalanceConfig.setForceCommit(false);
+ rebalanceConfig.setMinAvailableReplicas(0);
+ rebalanceConfig.setDowntime(false);
+ rebalanceConfig.setIncludeConsuming(true);
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ serverStarter0.stop();
+ serverStarter1.stop();
+ serverStarter2.stop();
+ serverStarter3.stop();
+ }
+
+ @Test
+ void testForceCommitStrictReplicaGroup()
+ throws Exception {
+ final String tenantA = "tenantA_strictRG";
+ final String tenantB = "tenantB_strictRG";
+
+ BaseServerStarter serverStarter0 = startOneServer(0);
+ BaseServerStarter serverStarter1 = startOneServer(1);
+ createServerTenant(tenantA, 0, 2);
+
+ BaseServerStarter serverStarter2 = startOneServer(2);
+ BaseServerStarter serverStarter3 = startOneServer(3);
+ createServerTenant(tenantB, 0, 2);
+
+ // Prepare the table to replicate segments across two servers on tenantA
+ TableConfig tableConfig = getRealtimeTableConfig();
+ tableConfig.setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
+ false));
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ tableConfig.getValidationConfig().setReplication("2");
+ tableConfig.getValidationConfig().setPeerSegmentDownloadScheme("http");
+
+ updateTableConfig(tableConfig);
+
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(false);
+ rebalanceConfig.setIncludeConsuming(true);
+
+ String response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ RebalanceResult rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ RebalanceSummaryResult summary =
rebalanceResult.getRebalanceSummaryResult();
+ assertEquals(
+
summary.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ Set<String> originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertEquals(originalConsumingSegmentsToMove.size(), 2);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ // test: move segments from tenantA to tenantB
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap
consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ LLCSegmentName consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ LLCSegmentName consumingSegmentOriginal =
+ new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // test: move segment from tenantB to tenantA with batch size
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setBatchSizePerServer(1);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ summary = rebalanceResult.getRebalanceSummaryResult();
+ originalConsumingSegmentsToMove = summary.getSegmentInfo()
+ .getConsumingSegmentToBeMovedSummary()
+ .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+ .keySet();
+ assertEquals(
+
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ 2);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ consumingSegmentOriginal = new
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber() + 1);
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // test: move segment from tenantA to tenantB with includeConsuming =
false, consuming segment should not be
+ // committed
+
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB,
null));
+ updateTableConfig(tableConfig);
+
+ rebalanceConfig.setForceCommit(true);
+ rebalanceConfig.setIncludeConsuming(false);
+
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ response =
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+ consumingSegmentInfoResponse =
+ JsonUtils.stringToObject(response,
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+ consumingSegmentOriginal = consumingSegmentNow;
+ consumingSegmentNow = new LLCSegmentName(
+
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+ // the sequence number should not increase since the consuming segment is
not committed
+ assertEquals(consumingSegmentNow.getSequenceNumber(),
consumingSegmentOriginal.getSequenceNumber());
+
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+ originalConsumingSegmentsToMove.size());
+
+ // Resume the table
+ tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(),
getServerTenant(), null));
+ tableConfig.getValidationConfig().setReplication("1");
+ tableConfig.getValidationConfig().setPeerSegmentDownloadScheme(null);
+ updateTableConfig(tableConfig);
+ rebalanceConfig.setForceCommit(false);
+ rebalanceConfig.setMinAvailableReplicas(0);
+ rebalanceConfig.setDowntime(false);
+ rebalanceConfig.setIncludeConsuming(true);
+ response = sendPostRequest(getRebalanceUrl(rebalanceConfig,
TableType.REALTIME));
+ rebalanceResult = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+ serverStarter0.stop();
+ serverStarter1.stop();
+ serverStarter2.stop();
+ serverStarter3.stop();
+ }
+
+ private void waitForRebalanceToComplete(String rebalanceJobId, long
timeoutMs) {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ String requestUrl =
getControllerRequestURLBuilder().forTableRebalanceStatus(rebalanceJobId);
+ SimpleHttpResponse httpResponse =
+
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new
URL(requestUrl).toURI(), null));
+ ServerRebalanceJobStatusResponse rebalanceStatus =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
ServerRebalanceJobStatusResponse.class);
+ return rebalanceStatus.getTableRebalanceProgressStats().getStatus() ==
RebalanceResult.Status.DONE;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 1000L, timeoutMs, "Failed to complete rebalance");
+ }
+
+ private void waitForTableEVISConverge(String tableName, long timeoutMs) {
Review Comment:
Unused?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -151,6 +151,24 @@ public void onTrigger(Trigger trigger, Map<String,
Map<String, String>> currentS
updatedStatsInZk = true;
}
break;
+ case FORCE_COMMIT_BEFORE_MOVED_START_TRIGGER:
+
_tableRebalanceProgressStats.getRebalanceProgressStatsOverall()._isForceCommittingConsumingSegments
= true;
+
_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep()._isForceCommittingConsumingSegments
= true;
+ trackStatsInZk();
+ updatedStatsInZk = true;
+ break;
+ case FORCE_COMMIT_BEFORE_MOVED_END_TRIGGER:
+ LOGGER.info("force commit for consuming segments for table: {} is
done",
Review Comment:
This log was removed but I don't see any log in `TableRebalancer` after a
force commit finishes successfully?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -508,66 +531,100 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
// Re-calculate the target assignment if IdealState changed while
waiting for ExternalView to converge
ZNRecord idealStateRecord = idealState.getRecord();
- if (idealStateRecord.getVersion() != expectedVersion) {
- tableRebalanceLogger.info(
- "IdealState version changed while waiting for ExternalView to
converge, re-calculating the target "
- + "assignment");
- Map<String, Map<String, String>> oldAssignment = currentAssignment;
- currentAssignment = idealStateRecord.getMapFields();
- expectedVersion = idealStateRecord.getVersion();
-
- // If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply the
- // same target instance state map for these segments to the new ideal
state as the target assignment
- boolean segmentsToMoveChanged = false;
- if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
- // For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment for
- // new added segments is based on the existing assignment
- segmentsToMoveChanged = true;
- } else {
- for (String segment : segmentsToMove) {
- Map<String, String> oldInstanceStateMap =
oldAssignment.get(segment);
- Map<String, String> currentInstanceStateMap =
currentAssignment.get(segment);
- // TODO: Consider allowing segment state change from CONSUMING to
ONLINE
- if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
- tableRebalanceLogger.info(
- "Segment state changed in IdealState from: {} to: {} for
segment: {}, re-calculating the target "
- + "assignment based on the new IdealState",
- oldInstanceStateMap, currentInstanceStateMap, segment);
- segmentsToMoveChanged = true;
- break;
+ Map<String, Map<String, String>> nextAssignment;
+ boolean needsRecalculation;
+ boolean shouldForceCommit = forceCommitBeforeMoved;
+
+ do {
+ needsRecalculation = false;
+
+ // Step 1: Handle version mismatch and recalculate if needed
+ if (idealStateRecord.getVersion() != expectedVersion) {
+ tableRebalanceLogger.info(
+ "IdealState version changed while waiting for ExternalView to
converge, re-calculating the target "
+ + "assignment");
+ Map<String, Map<String, String>> oldAssignment = currentAssignment;
+ currentAssignment = idealStateRecord.getMapFields();
+ expectedVersion = idealStateRecord.getVersion();
+
+ // If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply
+ // the same target instance state map for these segments to the new
ideal state as the target assignment
+ boolean segmentsToMoveChanged = false;
+ if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+ // For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment
+ // for new added segments is based on the existing assignment
+ segmentsToMoveChanged = true;
+ } else {
+ for (String segment : segmentsToMove) {
+ Map<String, String> oldInstanceStateMap =
oldAssignment.get(segment);
+ Map<String, String> currentInstanceStateMap =
currentAssignment.get(segment);
+ // TODO: Consider allowing segment state change from CONSUMING
to ONLINE
+ if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
+ tableRebalanceLogger.info(
+ "Segment state changed in IdealState from: {} to: {} for
segment: {}, re-calculating the target "
+ + "assignment based on the new IdealState",
+ oldInstanceStateMap, currentInstanceStateMap, segment);
+ segmentsToMoveChanged = true;
+ break;
+ }
}
}
- }
- if (segmentsToMoveChanged) {
- try {
- // Re-calculate the instance partitions in case the instance
configs changed during the rebalance
- instancePartitionsMap =
- getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false, minimizeDataMovement,
- tableRebalanceLogger).getLeft();
- tierToInstancePartitionsMap =
- getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false,
- minimizeDataMovement, tableRebalanceLogger).getLeft();
- targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
- tierToInstancePartitionsMap, rebalanceConfig);
- } catch (Exception e) {
- onReturnFailure("Caught exception while re-calculating the target
assignment, aborting the rebalance", e,
- tableRebalanceLogger);
- return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
- "Caught exception while re-calculating the target assignment:
" + e, instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment,
preChecksResult, summaryResult);
+ if (segmentsToMoveChanged) {
+ try {
+ // Re-calculate the instance partitions in case the instance
configs changed during the rebalance
+ instancePartitionsMap =
+ getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false, minimizeDataMovement,
+ tableRebalanceLogger).getLeft();
+ tierToInstancePartitionsMap =
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false,
+ minimizeDataMovement, tableRebalanceLogger).getLeft();
+ targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
+ tierToInstancePartitionsMap, rebalanceConfig);
+ } catch (Exception e) {
+ onReturnFailure("Caught exception while re-calculating the
target assignment, aborting the rebalance", e,
+ tableRebalanceLogger);
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
+ "Caught exception while re-calculating the target
assignment: " + e, instancePartitionsMap,
+ tierToInstancePartitionsMap, targetAssignment,
preChecksResult, summaryResult);
+ }
+ } else {
+ tableRebalanceLogger.info(
+ "No state change found for segments to be moved,
re-calculating the target assignment based on the "
+ + "previous target assignment");
+ Map<String, Map<String, String>> oldTargetAssignment =
targetAssignment;
+ // Other instance assignment code returns a TreeMap to keep it
sorted, doing the same here
+ targetAssignment = new TreeMap<>(currentAssignment);
+ for (String segment : segmentsToMove) {
+ targetAssignment.put(segment, oldTargetAssignment.get(segment));
+ }
}
- } else {
- tableRebalanceLogger.info(
- "No state change found for segments to be moved, re-calculating
the target assignment based on the "
- + "previous target assignment");
- Map<String, Map<String, String>> oldTargetAssignment =
targetAssignment;
- // Other instance assignment code returns a TreeMap to keep it
sorted, doing the same here
- targetAssignment = new TreeMap<>(currentAssignment);
- for (String segment : segmentsToMove) {
- targetAssignment.put(segment, oldTargetAssignment.get(segment));
+ }
+
+ // Step 2: Handle force commit if flag is set, then recalculate if
force commit occurred
+ if (shouldForceCommit) {
+ nextAssignment =
+ getNextAssignment(currentAssignment, targetAssignment,
minAvailableReplicas, enableStrictReplicaGroup,
+ lowDiskMode, batchSizePerServer, segmentPartitionIdMap,
partitionIdFetcher, tableRebalanceLogger);
+ Set<String> consumingSegmentsToMoveNext =
getMovingConsumingSegments(currentAssignment, nextAssignment);
+
+ if (!consumingSegmentsToMoveNext.isEmpty()) {
+ tableRebalanceLogger.info("Force committing {} consuming segments
before moving them",
+ consumingSegmentsToMoveNext.size());
+ needsRecalculation = true;
+ _tableRebalanceObserver.onTrigger(
+
TableRebalanceObserver.Trigger.FORCE_COMMIT_BEFORE_MOVED_START_TRIGGER, null,
null,
+ null);
+ idealState =
+ forceCommitConsumingSegmentsAndWait(tableNameWithType,
consumingSegmentsToMoveNext,
+ tableRebalanceLogger);
+ idealStateRecord = idealState.getRecord();
+ _tableRebalanceObserver.onTrigger(
+
TableRebalanceObserver.Trigger.FORCE_COMMIT_BEFORE_MOVED_END_TRIGGER, null,
null,
+ new
TableRebalanceObserver.RebalanceContext(consumingSegmentsToMoveNext.size()));
}
+ shouldForceCommit = false; // Only attempt force commit once
}
- }
+ } while (needsRecalculation);
Review Comment:
Cool, we can refactor this in a separate PR if needed, this looks okay for
now.
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalancePauselessIntegrationTest.java:
##########
@@ -0,0 +1,409 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.net.URL;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.ControllerConf;
+import
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
+import org.apache.pinot.controller.api.resources.TableViews;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.spi.config.table.RoutingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TableRebalancePauselessIntegrationTest extends
BasePauselessRealtimeIngestionTest {
+ @Override
+ protected String getFailurePoint() {
+ return null; // No failure point for basic test
+ }
+
+ @Override
+ protected int getExpectedSegmentsWithFailure() {
+ return NUM_REALTIME_SEGMENTS; // Always expect full segments
+ }
+
+ @Override
+ protected int getExpectedZKMetadataWithFailure() {
+ return NUM_REALTIME_SEGMENTS; // Always expect full metadata
+ }
+
+ @Override
+ protected long getCountStarResultWithFailure() {
+ return DEFAULT_COUNT_STAR_RESULT; // Always expect full count
+ }
+
+ @Override
+ protected void overrideControllerConf(Map<String, Object> properties) {
+ properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
+ }
+
+ @Override
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+ createServerTenant(getServerTenant(), 0, 1);
+ createBrokerTenant(getBrokerTenant(), 1);
+ setMaxSegmentCompletionTimeMillis();
+ setupNonPauselessTable();
+ injectFailure();
+ setupPauselessTable();
+ waitForAllDocsLoaded(600_000L); // Disable the pre-checks for the
rebalance job
+ }
+
+ private static String getQueryString(RebalanceConfig rebalanceConfig) {
+ return "dryRun=" + rebalanceConfig.isDryRun() + "&preChecks=" +
rebalanceConfig.isPreChecks()
+ + "&reassignInstances=" + rebalanceConfig.isReassignInstances()
+ + "&includeConsuming=" + rebalanceConfig.isIncludeConsuming()
+ + "&minimizeDataMovement=" + rebalanceConfig.getMinimizeDataMovement()
+ + "&bootstrap=" + rebalanceConfig.isBootstrap() + "&downtime=" +
rebalanceConfig.isDowntime()
+ + "&minAvailableReplicas=" + rebalanceConfig.getMinAvailableReplicas()
+ + "&bestEfforts=" + rebalanceConfig.isBestEfforts()
+ + "&batchSizePerServer=" + rebalanceConfig.getBatchSizePerServer()
+ + "&externalViewCheckIntervalInMs=" +
rebalanceConfig.getExternalViewCheckIntervalInMs()
+ + "&externalViewStabilizationTimeoutInMs=" +
rebalanceConfig.getExternalViewStabilizationTimeoutInMs()
+ + "&updateTargetTier=" + rebalanceConfig.isUpdateTargetTier()
+ + "&heartbeatIntervalInMs=" +
rebalanceConfig.getHeartbeatIntervalInMs()
+ + "&heartbeatTimeoutInMs=" + rebalanceConfig.getHeartbeatTimeoutInMs()
+ + "&maxAttempts=" + rebalanceConfig.getMaxAttempts()
+ + "&retryInitialDelayInMs=" +
rebalanceConfig.getRetryInitialDelayInMs()
+ + "&forceCommit=" + rebalanceConfig.isForceCommit();
+ }
+
+ private String getRebalanceUrl(RebalanceConfig rebalanceConfig, TableType
tableType) {
+ return StringUtil.join("/", getControllerRequestURLBuilder().getBaseUrl(),
"tables", getTableName(), "rebalance")
+ + "?type=" + tableType.toString() + "&" +
getQueryString(rebalanceConfig);
+ }
+
+ @Test
+ public void testForceCommit()
Review Comment:
Are these tests (nearly) identical to the ones added to
`TableRebalanceIntegrationTest`?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -508,66 +531,100 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
// Re-calculate the target assignment if IdealState changed while
waiting for ExternalView to converge
ZNRecord idealStateRecord = idealState.getRecord();
- if (idealStateRecord.getVersion() != expectedVersion) {
- tableRebalanceLogger.info(
- "IdealState version changed while waiting for ExternalView to
converge, re-calculating the target "
- + "assignment");
- Map<String, Map<String, String>> oldAssignment = currentAssignment;
- currentAssignment = idealStateRecord.getMapFields();
- expectedVersion = idealStateRecord.getVersion();
-
- // If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply the
- // same target instance state map for these segments to the new ideal
state as the target assignment
- boolean segmentsToMoveChanged = false;
- if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
- // For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment for
- // new added segments is based on the existing assignment
- segmentsToMoveChanged = true;
- } else {
- for (String segment : segmentsToMove) {
- Map<String, String> oldInstanceStateMap =
oldAssignment.get(segment);
- Map<String, String> currentInstanceStateMap =
currentAssignment.get(segment);
- // TODO: Consider allowing segment state change from CONSUMING to
ONLINE
- if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
- tableRebalanceLogger.info(
- "Segment state changed in IdealState from: {} to: {} for
segment: {}, re-calculating the target "
- + "assignment based on the new IdealState",
- oldInstanceStateMap, currentInstanceStateMap, segment);
- segmentsToMoveChanged = true;
- break;
+ Map<String, Map<String, String>> nextAssignment;
+ boolean needsRecalculation;
+ boolean shouldForceCommit = forceCommitBeforeMoved;
+
+ do {
+ needsRecalculation = false;
+
+ // Step 1: Handle version mismatch and recalculate if needed
+ if (idealStateRecord.getVersion() != expectedVersion) {
+ tableRebalanceLogger.info(
+ "IdealState version changed while waiting for ExternalView to
converge, re-calculating the target "
+ + "assignment");
+ Map<String, Map<String, String>> oldAssignment = currentAssignment;
+ currentAssignment = idealStateRecord.getMapFields();
+ expectedVersion = idealStateRecord.getVersion();
+
+ // If all the segments to be moved remain unchanged (same instance
state map) in the new ideal state, apply
+ // the same target instance state map for these segments to the new
ideal state as the target assignment
+ boolean segmentsToMoveChanged = false;
+ if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
+ // For StrictRealtimeSegmentAssignment, we need to recompute the
target assignment because the assignment
+ // for new added segments is based on the existing assignment
+ segmentsToMoveChanged = true;
+ } else {
+ for (String segment : segmentsToMove) {
+ Map<String, String> oldInstanceStateMap =
oldAssignment.get(segment);
+ Map<String, String> currentInstanceStateMap =
currentAssignment.get(segment);
+ // TODO: Consider allowing segment state change from CONSUMING
to ONLINE
+ if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
+ tableRebalanceLogger.info(
+ "Segment state changed in IdealState from: {} to: {} for
segment: {}, re-calculating the target "
+ + "assignment based on the new IdealState",
+ oldInstanceStateMap, currentInstanceStateMap, segment);
+ segmentsToMoveChanged = true;
+ break;
+ }
}
}
- }
- if (segmentsToMoveChanged) {
- try {
- // Re-calculate the instance partitions in case the instance
configs changed during the rebalance
- instancePartitionsMap =
- getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false, minimizeDataMovement,
- tableRebalanceLogger).getLeft();
- tierToInstancePartitionsMap =
- getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false,
- minimizeDataMovement, tableRebalanceLogger).getLeft();
- targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
- tierToInstancePartitionsMap, rebalanceConfig);
- } catch (Exception e) {
- onReturnFailure("Caught exception while re-calculating the target
assignment, aborting the rebalance", e,
- tableRebalanceLogger);
- return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
- "Caught exception while re-calculating the target assignment:
" + e, instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment,
preChecksResult, summaryResult);
+ if (segmentsToMoveChanged) {
+ try {
+ // Re-calculate the instance partitions in case the instance
configs changed during the rebalance
+ instancePartitionsMap =
+ getInstancePartitionsMap(tableConfig, reassignInstances,
bootstrap, false, minimizeDataMovement,
+ tableRebalanceLogger).getLeft();
+ tierToInstancePartitionsMap =
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, false,
+ minimizeDataMovement, tableRebalanceLogger).getLeft();
+ targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
+ tierToInstancePartitionsMap, rebalanceConfig);
+ } catch (Exception e) {
+ onReturnFailure("Caught exception while re-calculating the
target assignment, aborting the rebalance", e,
+ tableRebalanceLogger);
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
+ "Caught exception while re-calculating the target
assignment: " + e, instancePartitionsMap,
+ tierToInstancePartitionsMap, targetAssignment,
preChecksResult, summaryResult);
+ }
+ } else {
+ tableRebalanceLogger.info(
+ "No state change found for segments to be moved,
re-calculating the target assignment based on the "
+ + "previous target assignment");
+ Map<String, Map<String, String>> oldTargetAssignment =
targetAssignment;
+ // Other instance assignment code returns a TreeMap to keep it
sorted, doing the same here
+ targetAssignment = new TreeMap<>(currentAssignment);
+ for (String segment : segmentsToMove) {
+ targetAssignment.put(segment, oldTargetAssignment.get(segment));
+ }
}
- } else {
- tableRebalanceLogger.info(
- "No state change found for segments to be moved, re-calculating
the target assignment based on the "
- + "previous target assignment");
- Map<String, Map<String, String>> oldTargetAssignment =
targetAssignment;
- // Other instance assignment code returns a TreeMap to keep it
sorted, doing the same here
- targetAssignment = new TreeMap<>(currentAssignment);
- for (String segment : segmentsToMove) {
- targetAssignment.put(segment, oldTargetAssignment.get(segment));
+ }
+
+ // Step 2: Handle force commit if flag is set, then recalculate if
force commit occurred
+ if (shouldForceCommit) {
+ nextAssignment =
+ getNextAssignment(currentAssignment, targetAssignment,
minAvailableReplicas, enableStrictReplicaGroup,
+ lowDiskMode, batchSizePerServer, segmentPartitionIdMap,
partitionIdFetcher, tableRebalanceLogger);
+ Set<String> consumingSegmentsToMoveNext =
getMovingConsumingSegments(currentAssignment, nextAssignment);
+
+ if (!consumingSegmentsToMoveNext.isEmpty()) {
Review Comment:
Yeah, that makes sense. Also, regardless of whether we're using batching or
not and whether we're using strict realtime assignment or not, we should never
see more than one set of consuming segments being force committed per partition
within a single rebalance right? Since we're only force committing consuming
segments that are being moved in this step, even in the upsert / strict
realtime assignment case this will ensure that the subsequently created
consuming segments will be on the right target instances (because the entire
partition will be moved at a time)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]