This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4f1c40c6f1d feat: Enhance PurgeTask to automatically delete empty
segments (#16368)
4f1c40c6f1d is described below
commit 4f1c40c6f1d84a441cf5998f7b4c3ab0f00880ac
Author: avshenuk <[email protected]>
AuthorDate: Fri Jul 25 02:17:57 2025 +0200
feat: Enhance PurgeTask to automatically delete empty segments (#16368)
---
.../tests/PurgeMinionClusterIntegrationTest.java | 178 ++++++++++++++++++++-
.../minion/tasks/purge/PurgeTaskGenerator.java | 48 +++++-
2 files changed, 221 insertions(+), 5 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index 19459894c72..9aab0fc5771 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -61,6 +61,8 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
private static final String PURGE_DELTA_PASSED_TABLE = "myTable2";
private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3";
private static final String PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE =
"myTable4";
+ private static final String PURGE_ALL_RECORDS_TABLE = "myTable5";
+ private static final String PURGE_REALTIME_LAST_SEGMENT_TABLE = "myTable6";
protected PinotHelixTaskResourceManager _helixTaskResourceManager;
protected PinotTaskManager _taskManager;
@@ -82,11 +84,15 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
startServer();
startMinion();
- List<String> allTables = List.of(PURGE_FIRST_RUN_TABLE,
PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE,
- PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
+ // Start Kafka for realtime table test
+ startKafka();
+
+ List<String> allOfflineTables =
+ List.of(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE,
PURGE_DELTA_NOT_PASSED_TABLE,
+ PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE,
PURGE_ALL_RECORDS_TABLE);
Schema schema = null;
TableConfig tableConfig = null;
- for (String tableName : allTables) {
+ for (String tableName : allOfflineTables) {
// create and upload schema
schema = createSchema();
schema.setSchemaName(tableName);
@@ -107,10 +113,24 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
_segmentTarDir);
// Upload segments for all tables
- for (String tableName : allTables) {
+ for (String tableName : allOfflineTables) {
uploadSegments(tableName, _segmentTarDir);
}
+ // Set up realtime table with purge task configuration
+ schema = createSchema();
+ schema.setSchemaName(PURGE_REALTIME_LAST_SEGMENT_TABLE);
+ addSchema(schema);
+ // Create realtime table config with purge task
+ TableConfig realtimeTableConfig =
createRealtimeTableConfig(avroFiles.get(0));
+ realtimeTableConfig.setTableName(PURGE_REALTIME_LAST_SEGMENT_TABLE);
+ realtimeTableConfig.setTaskConfig(getPurgeTaskConfig());
+ addTableConfig(realtimeTableConfig);
+ // Push data into Kafka to create LLC segments
+ pushAvroIntoKafka(avroFiles);
+ // Wait for all documents loaded
+ waitForDocsLoaded(600_000L, true, PURGE_REALTIME_LAST_SEGMENT_TABLE);
+
setRecordPurger();
_helixTaskResourceManager =
_controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
@@ -151,6 +171,10 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
if (tableNames.contains(rawTableName)) {
return row -> row.getValue("ArrTime").equals(1);
+ } else if (PURGE_ALL_RECORDS_TABLE.equals(rawTableName) ||
PURGE_REALTIME_LAST_SEGMENT_TABLE.equals(
+ rawTableName)) {
+ // Purge ALL records to test segment deletion
+ return row -> true;
} else {
return null;
}
@@ -381,6 +405,151 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
verifyTableDelete(offlineTableName);
}
+ /**
+ * Test that segments are automatically deleted when all records are purged
+ */
+ @Test
+ public void testSegmentDeletionWhenAllRecordsPurged()
+ throws Exception {
+ // Expected behavior:
+ // 1. First run: All records in segments are purged (RecordPurger returns
true for all records)
+ // 2. First run: Segments become empty but are still present with
totalDocs = 0
+ // 3. Second run: Empty segments are automatically deleted by
PurgeTaskGenerator during task generation
+ // 4. Verify that segments are removed from the table
+
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_ALL_RECORDS_TABLE);
+
+ // Get initial segment count
+ List<SegmentZKMetadata> initialSegments =
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
+ int initialSegmentCount = initialSegments.size();
+ assertTrue(initialSegmentCount > 0, "Table should have segments
initially");
+
+ // First run: Schedule purge task to create empty segments
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName)))
+ .get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertTrue(_helixTaskResourceManager.getTaskQueues()
+
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
+
+ // Wait for first task to complete
+ waitForTaskToComplete();
+
+ // Verify table now has no data but segments still exist (empty segments)
+ TestUtils.waitForCondition(aVoid ->
getCurrentCountStarResult(PURGE_ALL_RECORDS_TABLE) == 0, 60_000L,
+ "Failed to get expected purged records");
+ List<SegmentZKMetadata> segmentsAfterFirstRun =
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
+ assertEquals(segmentsAfterFirstRun.size(), initialSegmentCount,
+ "Segments should still exist after first purge run");
+
+ // Verify segments have totalDocs = 0
+ for (SegmentZKMetadata segment : segmentsAfterFirstRun) {
+ assertEquals(segment.getTotalDocs(), 0L, "All segments should have zero
documents after purging");
+ }
+
+ // Second run: Schedule purge task again - this should delete the empty
segments during task generation
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName)))
+ .get(MinionConstants.PurgeTask.TASK_TYPE));
+
+ // Wait for second task to complete (if any tasks were generated)
+ waitForTaskToComplete();
+
+ // Verify that all empty segments have been deleted
+ TestUtils.waitForCondition(aVoid -> {
+ List<SegmentZKMetadata> remainingSegments =
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
+ return remainingSegments.isEmpty();
+ }, 60_000L, "Expected all empty segments to be deleted after second purge
run");
+
+ // Verify table still has no data
+ assertEquals(getCurrentCountStarResult(PURGE_ALL_RECORDS_TABLE), 0);
+
+ // Drop the table
+ dropOfflineTable(PURGE_ALL_RECORDS_TABLE);
+
+ // Check if the task metadata is cleaned up on table deletion
+ verifyTableDelete(offlineTableName);
+ }
+
+ /**
+ * Test that empty segments are preserved when they are the last segment of
a partition in realtime tables.
+ * This test specifically covers the edge case where empty segments should
only
+ * be allowed when they are needed to mark the end of a stream partition
(e.g. Kinesis).
+ */
+ @Test
+ public void testRealtimeLastSegmentPreservation()
+ throws Exception {
+ // Expected behavior:
+ // 1. First run: All records in completed segments are purged
(RecordPurger returns true for all records)
+ // 2. First run: Completed segments become empty but are still present
with totalDocs = 0
+ // 3. Second run: Empty non-last completed segments are deleted, but last
segments per partition are preserved
+ // 4. Verify that consuming segments and last empty completed segments per
partition remain
+
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(PURGE_REALTIME_LAST_SEGMENT_TABLE);
+
+ // Get initial segment count
+ List<SegmentZKMetadata> initialSegments =
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
+ int initialSegmentCount = initialSegments.size();
+ assertTrue(initialSegmentCount > 0, "Table should have segments
initially");
+
+ // First run: Schedule purge task to create empty segments
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(realtimeTableName)))
+ .get(MinionConstants.PurgeTask.TASK_TYPE));
+
+ // Wait for first task to complete
+ waitForTaskToComplete();
+
+ // Calculate expected remaining records after purging completed segments
+ // Expected remaining = totalRecords % recordsPerSegmentPerPartition
+ long expectedRemainingRecords = getCountStarResult() %
(getRealtimeSegmentFlushSize() / getNumKafkaPartitions());
+ TestUtils.waitForCondition(
+ aVoid -> getCurrentCountStarResult(PURGE_REALTIME_LAST_SEGMENT_TABLE)
== expectedRemainingRecords,
+ 60_000L, "Failed to get expected purged records");
+ List<SegmentZKMetadata> segmentsAfterFirstRun =
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
+ assertEquals(segmentsAfterFirstRun.size(), initialSegmentCount,
+ "Segments should still exist after first purge run");
+
+ // Verify segments have totalDocs = 0 (for completed segments)
+ for (SegmentZKMetadata segment : segmentsAfterFirstRun) {
+ if (segment.getStatus().isCompleted()) {
+ assertEquals(segment.getTotalDocs(), 0L, "All completed segments
should have zero documents after purging");
+ }
+ }
+
+ // Second run: Schedule purge task again - this should delete empty
non-last segments but preserve last segments
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(realtimeTableName)))
+ .get(MinionConstants.PurgeTask.TASK_TYPE));
+
+ // Wait for second task to complete
+ waitForTaskToComplete();
+
+ TestUtils.waitForCondition(aVoid -> {
+ // Verify that we have the expected number of segments remaining:
+ // - 1 consuming segment per partition (should not be deleted)
+ // - 1 completed empty segment per partition (last segment, should be
preserved)
+ List<SegmentZKMetadata> remainingSegments =
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
+ long consumingSegments = remainingSegments.stream()
+ .filter(s -> !s.getStatus().isCompleted())
+ .count();
+ long completedSegments = remainingSegments.stream()
+ .filter(s -> s.getStatus().isCompleted())
+ .count();
+
+ // We should have: 1 consuming segment per partition + 1 last empty
completed segment per partition
+ return consumingSegments == getNumKafkaPartitions() && completedSegments
== getNumKafkaPartitions();
+ }, 60_000L, "Expected all but last empty completed segments to be deleted
after second purge run");
+
+ // Verify table still has expected remaining data (from consuming segments)
+ assertEquals(getCurrentCountStarResult(PURGE_REALTIME_LAST_SEGMENT_TABLE),
expectedRemainingRecords);
+
+ // Drop the realtime table
+ dropRealtimeTable(PURGE_REALTIME_LAST_SEGMENT_TABLE);
+
+ // Verify cleanup
+ verifyTableDelete(realtimeTableName);
+ }
+
protected void verifyTableDelete(String tableNameWithType) {
TestUtils.waitForCondition(input -> {
// Check if the segment lineage is cleaned up
@@ -416,6 +585,7 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
stopServer();
stopBroker();
stopController();
+ stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
index 3e020095974..bc0a6f1098f 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/purge/PurgeTaskGenerator.java
@@ -23,11 +23,13 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.data.Segment;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.LLCSegmentName;
import
org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorUtils;
import org.apache.pinot.core.common.MinionConstants;
@@ -96,7 +98,6 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
List<SegmentZKMetadata> notpurgedSegmentsZKMetadata = new ArrayList<>();
for (SegmentZKMetadata segmentMetadata : segmentsZKMetadata) {
-
if (segmentMetadata.getCustomMap() != null &&
segmentMetadata.getCustomMap()
.containsKey(MinionConstants.PurgeTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX)) {
purgedSegmentsZKMetadata.add(segmentMetadata);
@@ -113,8 +114,24 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
int tableNumTasks = 0;
Set<Segment> runningSegments =
TaskGeneratorUtils.getRunningSegments(MinionConstants.PurgeTask.TASK_TYPE,
_clusterInfoAccessor);
+ List<String> segmentsForDeletion = new ArrayList<>();
+ // For realtime tables, build a map of partition to latest segment to
avoid deleting last segments
+ Set<String> lastLLCSegmentPerPartition = new HashSet<>();
+ if (tableConfig.getTableType() == TableType.REALTIME) {
+ lastLLCSegmentPerPartition =
getLastLLCSegmentPerPartition(segmentsZKMetadata);
+ }
for (SegmentZKMetadata segmentZKMetadata : notpurgedSegmentsZKMetadata) {
String segmentName = segmentZKMetadata.getSegmentName();
+ if (segmentZKMetadata.getTotalDocs() == 0L) {
+ // Check if this empty segment is the last segment of a partition
+ if (lastLLCSegmentPerPartition.contains(segmentName)) {
+ LOGGER.info("Skipping deletion of empty segment {} as it is the
last segment of its partition",
+ segmentName);
+ } else {
+ segmentsForDeletion.add(segmentName);
+ }
+ continue;
+ }
Map<String, String> configs = new
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
Long tsLastPurge;
if (segmentZKMetadata.getCustomMap() != null) {
@@ -141,9 +158,38 @@ public class PurgeTaskGenerator extends BaseTaskGenerator {
pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
tableNumTasks++;
}
+ if (!segmentsForDeletion.isEmpty()) {
+
_clusterInfoAccessor.getPinotHelixResourceManager().deleteSegments(tableName,
segmentsForDeletion,
+ "0d");
+ LOGGER.info(
+ "Deleted segments containing no records for table: {}, number of
segments to be deleted: {}",
+ tableName, segmentsForDeletion.size());
+ }
LOGGER.info("Finished generating {} tasks configs for table: {} " + "for
task: {}", tableNumTasks, tableName,
taskType);
}
return pinotTaskConfigs;
}
+
+ private Set<String> getLastLLCSegmentPerPartition(List<SegmentZKMetadata>
segmentsZKMetadata) {
+ Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
+ for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+ // Skip UPLOADED segments that don't conform to the LLC segment name
+ LLCSegmentName llcSegmentName =
LLCSegmentName.of(segmentZKMetadata.getSegmentName());
+ if (llcSegmentName != null) {
+ latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(),
(k, latestLLCSegmentName) -> {
+ if (latestLLCSegmentName == null
+ || llcSegmentName.getSequenceNumber() >
latestLLCSegmentName.getSequenceNumber()) {
+ return llcSegmentName;
+ } else {
+ return latestLLCSegmentName;
+ }
+ });
+ }
+ }
+ Set<String> lastLLCSegmentPerPartition = new HashSet<>();
+ latestLLCSegmentNameMap.forEach((ignored, value) ->
+ lastLLCSegmentPerPartition.add(value.getSegmentName()));
+ return lastLLCSegmentPerPartition;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]