This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f1c40c6f1d feat: Enhance PurgeTask to automatically delete empty 
segments (#16368)
4f1c40c6f1d is described below

commit 4f1c40c6f1d84a441cf5998f7b4c3ab0f00880ac
Author: avshenuk <[email protected]>
AuthorDate: Fri Jul 25 02:17:57 2025 +0200

    feat: Enhance PurgeTask to automatically delete empty segments (#16368)
---
 .../tests/PurgeMinionClusterIntegrationTest.java   | 178 ++++++++++++++++++++-
 .../minion/tasks/purge/PurgeTaskGenerator.java     |  48 +++++-
 2 files changed, 221 insertions(+), 5 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index 19459894c72..9aab0fc5771 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -61,6 +61,8 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
   private static final String PURGE_DELTA_PASSED_TABLE = "myTable2";
   private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3";
   private static final String PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE = 
"myTable4";
+  private static final String PURGE_ALL_RECORDS_TABLE = "myTable5";
+  private static final String PURGE_REALTIME_LAST_SEGMENT_TABLE = "myTable6";
 
   protected PinotHelixTaskResourceManager _helixTaskResourceManager;
   protected PinotTaskManager _taskManager;
@@ -82,11 +84,15 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
     startServer();
     startMinion();
 
-    List<String> allTables = List.of(PURGE_FIRST_RUN_TABLE, 
PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE,
-        PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+    // Start Kafka for realtime table test
+    startKafka();
+
+    List<String> allOfflineTables =
+        List.of(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, 
PURGE_DELTA_NOT_PASSED_TABLE,
+            PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE, 
PURGE_ALL_RECORDS_TABLE);
     Schema schema = null;
     TableConfig tableConfig = null;
-    for (String tableName : allTables) {
+    for (String tableName : allOfflineTables) {
       // create and upload schema
       schema = createSchema();
       schema.setSchemaName(tableName);
@@ -107,10 +113,24 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
         _segmentTarDir);
 
     // Upload segments for all tables
-    for (String tableName : allTables) {
+    for (String tableName : allOfflineTables) {
       uploadSegments(tableName, _segmentTarDir);
     }
 
+    // Set up realtime table with purge task configuration
+    schema = createSchema();
+    schema.setSchemaName(PURGE_REALTIME_LAST_SEGMENT_TABLE);
+    addSchema(schema);
+    // Create realtime table config with purge task
+    TableConfig realtimeTableConfig = 
createRealtimeTableConfig(avroFiles.get(0));
+    realtimeTableConfig.setTableName(PURGE_REALTIME_LAST_SEGMENT_TABLE);
+    realtimeTableConfig.setTaskConfig(getPurgeTaskConfig());
+    addTableConfig(realtimeTableConfig);
+    // Push data into Kafka to create LLC segments
+    pushAvroIntoKafka(avroFiles);
+    // Wait for all documents loaded
+    waitForDocsLoaded(600_000L, true, PURGE_REALTIME_LAST_SEGMENT_TABLE);
+
     setRecordPurger();
     _helixTaskResourceManager = 
_controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();
@@ -151,6 +171,10 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
               PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
       if (tableNames.contains(rawTableName)) {
         return row -> row.getValue("ArrTime").equals(1);
+      } else if (PURGE_ALL_RECORDS_TABLE.equals(rawTableName) || 
PURGE_REALTIME_LAST_SEGMENT_TABLE.equals(
+          rawTableName)) {
+        // Purge ALL records to test segment deletion
+        return row -> true;
       } else {
         return null;
       }
@@ -381,6 +405,151 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
     verifyTableDelete(offlineTableName);
   }
 
+  /**
+   * Test that segments are automatically deleted when all records are purged
+   */
+  @Test
+  public void testSegmentDeletionWhenAllRecordsPurged()
+      throws Exception {
+    // Expected behavior:
+    // 1. First run: All records in segments are purged (RecordPurger returns 
true for all records)
+    // 2. First run: Segments become empty but are still present with 
totalDocs = 0
+    // 3. Second run: Empty segments are automatically deleted by 
PurgeTaskGenerator during task generation
+    // 4. Verify that segments are removed from the table
+
+    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_ALL_RECORDS_TABLE);
+
+    // Get initial segment count
+    List<SegmentZKMetadata> initialSegments = 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
+    int initialSegmentCount = initialSegments.size();
+    assertTrue(initialSegmentCount > 0, "Table should have segments 
initially");
+
+    // First run: Schedule purge task to create empty segments
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName)))
+        .get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertTrue(_helixTaskResourceManager.getTaskQueues()
+        
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
+
+    // Wait for first task to complete
+    waitForTaskToComplete();
+
+    // Verify table now has no data but segments still exist (empty segments)
+    TestUtils.waitForCondition(aVoid -> 
getCurrentCountStarResult(PURGE_ALL_RECORDS_TABLE) == 0, 60_000L,
+        "Failed to get expected purged records");
+    List<SegmentZKMetadata> segmentsAfterFirstRun = 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
+    assertEquals(segmentsAfterFirstRun.size(), initialSegmentCount,
+        "Segments should still exist after first purge run");
+
+    // Verify segments have totalDocs = 0
+    for (SegmentZKMetadata segment : segmentsAfterFirstRun) {
+      assertEquals(segment.getTotalDocs(), 0L, "All segments should have zero 
documents after purging");
+    }
+
+    // Second run: Schedule purge task again - this should delete the empty 
segments during task generation
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName)))
+        .get(MinionConstants.PurgeTask.TASK_TYPE));
+
+    // Wait for second task to complete (if any tasks were generated)
+    waitForTaskToComplete();
+
+    // Verify that all empty segments have been deleted
+    TestUtils.waitForCondition(aVoid -> {
+      List<SegmentZKMetadata> remainingSegments = 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
+      return remainingSegments.isEmpty();
+    }, 60_000L, "Expected all empty segments to be deleted after second purge 
run");
+
+    // Verify table still has no data
+    assertEquals(getCurrentCountStarResult(PURGE_ALL_RECORDS_TABLE), 0);
+
+    // Drop the table
+    dropOfflineTable(PURGE_ALL_RECORDS_TABLE);
+
+    // Check if the task metadata is cleaned up on table deletion
+    verifyTableDelete(offlineTableName);
+  }
+
+  /**
+   * Test that empty segments are preserved when they are the last segment of 
a partition in realtime tables.
+   * This test specifically covers the edge case where empty segments should 
only
+   * be allowed when they are needed to mark the end of a stream partition 
(e.g. Kinesis).
+   */
+  @Test
+  public void testRealtimeLastSegmentPreservation()
+      throws Exception {
+    // Expected behavior:
+    // 1. First run: All records in completed segments are purged 
(RecordPurger returns true for all records)
+    // 2. First run: Completed segments become empty but are still present 
with totalDocs = 0
+    // 3. Second run: Empty non-last completed segments are deleted, but last 
segments per partition are preserved
+    // 4. Verify that consuming segments and last empty completed segments per 
partition remain
+
+    String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(PURGE_REALTIME_LAST_SEGMENT_TABLE);
+
+    // Get initial segment count
+    List<SegmentZKMetadata> initialSegments = 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
+    int initialSegmentCount = initialSegments.size();
+    assertTrue(initialSegmentCount > 0, "Table should have segments 
initially");
+
+    // First run: Schedule purge task to create empty segments
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(realtimeTableName)))
+        .get(MinionConstants.PurgeTask.TASK_TYPE));
+
+    // Wait for first task to complete
+    waitForTaskToComplete();
+
+    // Calculate expected remaining records after purging completed segments
+    // Expected remaining = totalRecords % recordsPerSegmentPerPartition
+    long expectedRemainingRecords = getCountStarResult() % 
(getRealtimeSegmentFlushSize() / getNumKafkaPartitions());
+    TestUtils.waitForCondition(
+        aVoid -> getCurrentCountStarResult(PURGE_REALTIME_LAST_SEGMENT_TABLE) 
== expectedRemainingRecords,
+        60_000L, "Failed to get expected purged records");
+    List<SegmentZKMetadata> segmentsAfterFirstRun = 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
+    assertEquals(segmentsAfterFirstRun.size(), initialSegmentCount,
+        "Segments should still exist after first purge run");
+
+    // Verify segments have totalDocs = 0 (for completed segments)
+    for (SegmentZKMetadata segment : segmentsAfterFirstRun) {
+      if (segment.getStatus().isCompleted()) {
+        assertEquals(segment.getTotalDocs(), 0L, "All completed segments 
should have zero documents after purging");
+      }
+    }
+
+    // Second run: Schedule purge task again - this should delete empty 
non-last segments but preserve last segments
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(realtimeTableName)))
+        .get(MinionConstants.PurgeTask.TASK_TYPE));
+
+    // Wait for second task to complete
+    waitForTaskToComplete();
+
+    TestUtils.waitForCondition(aVoid -> {
+      // Verify that we have the expected number of segments remaining:
+      // - 1 consuming segment per partition (should not be deleted)
+      // - 1 completed empty segment per partition (last segment, should be 
preserved)
+      List<SegmentZKMetadata> remainingSegments = 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
+      long consumingSegments = remainingSegments.stream()
+          .filter(s -> !s.getStatus().isCompleted())
+          .count();
+      long completedSegments = remainingSegments.stream()
+          .filter(s -> s.getStatus().isCompleted())
+          .count();
+
+      // We should have: 1 consuming segment per partition + 1 last empty 
completed segment per partition
+      return consumingSegments == getNumKafkaPartitions() && completedSegments 
== getNumKafkaPartitions();
+    }, 60_000L, "Expected all but last empty completed segments to be deleted 
after second purge run");
+
+    // Verify table still has expected remaining data (from consuming segments)
+    assertEquals(getCurrentCountStarResult(PURGE_REALTIME_LAST_SEGMENT_TABLE), 
expectedRemainingRecords);
+
+    // Drop the realtime table
+    dropRealtimeTable(PURGE_REALTIME_LAST_SEGMENT_TABLE);
+
+    // Verify cleanup
+    verifyTableDelete(realtimeTableName);
+  }
+
   protected void verifyTableDelete(String tableNameWithType) {
     TestUtils.waitForCondition(input -> {
       // Check if the segment lineage is cleaned up
@@ -416,6 +585,7 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
     stopServer();
     stopBroker();
     stopController();
+    stopKafka();
     stopZk();
     FileUtils.deleteDirectory(_tempDir);
   }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
index 3e020095974..bc0a6f1098f 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
@@ -23,11 +23,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.common.data.Segment;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import 
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
 import 
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
 import org.apache.pinot.core.common.MinionConstants;
@@ -96,7 +98,6 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
       List<SegmentZKMetadata> notpurgedSegmentsZKMetadata = new ArrayList<>();
 
       for (SegmentZKMetadata segmentMetadata : segmentsZKMetadata) {
-
         if (segmentMetadata.getCustomMap() != null && 
segmentMetadata.getCustomMap()
             .containsKey(MinionConstants.PurgeTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX)) {
           purgedSegmentsZKMetadata.add(segmentMetadata);
@@ -113,8 +114,24 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
       int tableNumTasks = 0;
       Set<Segment> runningSegments =
           
TaskGeneratorUtils.getRunningSegments(MinionConstants.PurgeTask.TASK_TYPE, 
_clusterInfoAccessor);
+      List<String> segmentsForDeletion = new ArrayList<>();
+      // For realtime tables, build a map of partition to latest segment to 
avoid deleting last segments
+      Set<String> lastLLCSegmentPerPartition = new HashSet<>();
+      if (tableConfig.getTableType() == TableType.REALTIME) {
+        lastLLCSegmentPerPartition = 
getLastLLCSegmentPerPartition(segmentsZKMetadata);
+      }
       for (SegmentZKMetadata segmentZKMetadata : notpurgedSegmentsZKMetadata) {
         String segmentName = segmentZKMetadata.getSegmentName();
+        if (segmentZKMetadata.getTotalDocs() == 0L) {
+          // Check if this empty segment is the last segment of a partition
+          if (lastLLCSegmentPerPartition.contains(segmentName)) {
+            LOGGER.info("Skipping deletion of empty segment {} as it is the 
last segment of its partition",
+                segmentName);
+          } else {
+            segmentsForDeletion.add(segmentName);
+          }
+          continue;
+        }
         Map<String, String> configs = new 
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
         Long tsLastPurge;
         if (segmentZKMetadata.getCustomMap() != null) {
@@ -141,9 +158,38 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
         pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
         tableNumTasks++;
       }
+      if (!segmentsForDeletion.isEmpty()) {
+        
_clusterInfoAccessor.getPinotHelixResourceManager().deleteSegments(tableName, 
segmentsForDeletion,
+            "0d");
+        LOGGER.info(
+            "Deleted segments containing no records for table: {}, number of 
segments to be deleted: {}",
+            tableName, segmentsForDeletion.size());
+      }
       LOGGER.info("Finished generating {} tasks configs for table: {} " + "for 
task: {}", tableNumTasks, tableName,
           taskType);
     }
     return pinotTaskConfigs;
   }
+
+  private Set<String> getLastLLCSegmentPerPartition(List<SegmentZKMetadata> 
segmentsZKMetadata) {
+    Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
+    for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+      // Skip UPLOADED segments that don't conform to the LLC segment name
+      LLCSegmentName llcSegmentName = 
LLCSegmentName.of(segmentZKMetadata.getSegmentName());
+      if (llcSegmentName != null) {
+        latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(), 
(k, latestLLCSegmentName) -> {
+          if (latestLLCSegmentName == null
+              || llcSegmentName.getSequenceNumber() > 
latestLLCSegmentName.getSequenceNumber()) {
+            return llcSegmentName;
+          } else {
+            return latestLLCSegmentName;
+          }
+        });
+      }
+    }
+    Set<String> lastLLCSegmentPerPartition = new HashSet<>();
+    latestLLCSegmentNameMap.forEach((ignored, value) ->
+        lastLLCSegmentPerPartition.add(value.getSegmentName()));
+    return lastLLCSegmentPerPartition;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to