J-HowHuang commented on code in PR #16096:
URL: https://github.com/apache/pinot/pull/16096#discussion_r2159588880


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java:
##########
@@ -1379,6 +1388,294 @@ public void testRebalanceJobZkMetadataCleanup()
         _helixResourceManager.getControllerJobZKMetadata(inProgressJobId, 
ControllerJobTypes.TABLE_REBALANCE));
   }
 
+  @Test
+  public void testForceCommit()
+      throws Exception {
+    final String tenantA = "tenantA";
+    final String tenantB = "tenantB";
+
+    TableConfig tableConfig = getRealtimeTableConfig();
+
+    BaseServerStarter serverStarter0 = startOneServer(NUM_SERVERS);
+    BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS + 1);
+    createServerTenant(tenantA, 0, 2);
+
+    BaseServerStarter serverStarter2 = startOneServer(NUM_SERVERS + 2);
+    BaseServerStarter serverStarter3 = startOneServer(NUM_SERVERS + 3);
+    createServerTenant(tenantB, 0, 2);
+
+    // Prepare the table to replicate segments across two servers on tenantA
+    tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA, 
null));
+    tableConfig.getValidationConfig().setReplication("2");
+    updateTableConfig(tableConfig);
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(false);
+    rebalanceConfig.setIncludeConsuming(true);
+
+    String response = sendPostRequest(getRebalanceUrl(rebalanceConfig, 
TableType.REALTIME));
+    RebalanceResult rebalanceResult = JsonUtils.stringToObject(response, 
RebalanceResult.class);
+    RebalanceSummaryResult summary = 
rebalanceResult.getRebalanceSummaryResult();
+    assertEquals(
+        
summary.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+        2);
+    Set<String> originalConsumingSegmentsToMove = summary.getSegmentInfo()
+        .getConsumingSegmentToBeMovedSummary()
+        .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+        .keySet();
+    assertEquals(originalConsumingSegmentsToMove.size(), 2);
+
+    waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+    // test: move segments from tenantA to tenantB
+    tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB, 
null));
+    updateTableConfig(tableConfig);
+
+    rebalanceConfig.setForceCommit(true);
+
+    response = sendPostRequest(getRebalanceUrl(rebalanceConfig, 
TableType.REALTIME));
+    rebalanceResult = JsonUtils.stringToObject(response, 
RebalanceResult.class);
+    assertEquals(
+        
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+        2);
+    summary = rebalanceResult.getRebalanceSummaryResult();
+    originalConsumingSegmentsToMove = summary.getSegmentInfo()
+        .getConsumingSegmentToBeMovedSummary()
+        .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+        .keySet();
+    waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+    response = 
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+    ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap 
consumingSegmentInfoResponse =
+        JsonUtils.stringToObject(response, 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+    LLCSegmentName consumingSegmentNow = new LLCSegmentName(
+        
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+    LLCSegmentName consumingSegmentOriginal =
+        new 
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+    assertEquals(consumingSegmentNow.getSequenceNumber(), 
consumingSegmentOriginal.getSequenceNumber() + 1);
+    
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+        originalConsumingSegmentsToMove.size());
+
+    // test: move segment from tenantB to tenantA with downtime
+
+    tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantA, 
null));
+    updateTableConfig(tableConfig);
+
+    rebalanceConfig.setForceCommit(true);
+    rebalanceConfig.setDowntime(true);
+
+    response = sendPostRequest(getRebalanceUrl(rebalanceConfig, 
TableType.REALTIME));
+    rebalanceResult = JsonUtils.stringToObject(response, 
RebalanceResult.class);
+    summary = rebalanceResult.getRebalanceSummaryResult();
+    originalConsumingSegmentsToMove = summary.getSegmentInfo()
+        .getConsumingSegmentToBeMovedSummary()
+        .getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp()
+        .keySet();
+    assertEquals(
+        
rebalanceResult.getRebalanceSummaryResult().getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+        2);
+
+    waitForTableEVISConverge(getTableName(), 30000);
+
+    response = 
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+    consumingSegmentInfoResponse =
+        JsonUtils.stringToObject(response, 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+    consumingSegmentNow = new LLCSegmentName(
+        
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+    consumingSegmentOriginal = new 
LLCSegmentName(originalConsumingSegmentsToMove.stream().sorted().iterator().next());
+    assertEquals(consumingSegmentNow.getSequenceNumber(), 
consumingSegmentOriginal.getSequenceNumber() + 1);
+    
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+        originalConsumingSegmentsToMove.size());
+
+    // test: move segment from tenantA to tenantB with includeConsuming = 
false, consuming segment should not be
+    // committed
+
+    tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), tenantB, 
null));
+    updateTableConfig(tableConfig);
+
+    rebalanceConfig.setForceCommit(true);
+    rebalanceConfig.setDowntime(false);
+    rebalanceConfig.setIncludeConsuming(false);
+
+    response = sendPostRequest(getRebalanceUrl(rebalanceConfig, 
TableType.REALTIME));
+    rebalanceResult = JsonUtils.stringToObject(response, 
RebalanceResult.class);
+
+    waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+    response = 
sendGetRequest(getControllerRequestURLBuilder().forTableConsumingSegmentsInfo(getTableName()));
+    consumingSegmentInfoResponse =
+        JsonUtils.stringToObject(response, 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap.class);
+    consumingSegmentOriginal = consumingSegmentNow;
+    consumingSegmentNow = new LLCSegmentName(
+        
consumingSegmentInfoResponse._segmentToConsumingInfoMap.keySet().stream().sorted().iterator().next());
+    // the sequence number should not increase since the consuming segment is 
not committed
+    assertEquals(consumingSegmentNow.getSequenceNumber(), 
consumingSegmentOriginal.getSequenceNumber());
+    
assertEquals(consumingSegmentInfoResponse._segmentToConsumingInfoMap.size(),
+        originalConsumingSegmentsToMove.size());
+
+    // Resume the table
+    tableConfig.setTenantConfig(new TenantConfig(getBrokerTenant(), 
getServerTenant(), null));
+    tableConfig.getValidationConfig().setReplication("1");
+    updateTableConfig(tableConfig);
+    rebalanceConfig.setForceCommit(false);
+    rebalanceConfig.setMinAvailableReplicas(0);
+    rebalanceConfig.setDowntime(false);
+    rebalanceConfig.setIncludeConsuming(true);
+    response = sendPostRequest(getRebalanceUrl(rebalanceConfig, 
TableType.REALTIME));
+    rebalanceResult = JsonUtils.stringToObject(response, 
RebalanceResult.class);
+    waitForRebalanceToComplete(rebalanceResult.getJobId(), 30000);
+
+    serverStarter0.stop();
+    serverStarter1.stop();
+    serverStarter2.stop();
+    serverStarter3.stop();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to