This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f6ae06f499 Fixing the bug for Upsert compaction task generator (#12380)
f6ae06f499 is described below
commit f6ae06f499e2fc18934b1ee605d50b7d9006e46b
Author: Seunghyun Lee <[email protected]>
AuthorDate: Wed Feb 7 09:09:05 2024 -0800
Fixing the bug for Upsert compaction task generator (#12380)
- The current task generator had an issue with the null pointer
exception when validDocIds metadata doesn't show up on the
list of ZK metadata. Fixed the logic to properly handle
this case.
- Added the default value for validDocIdsType for API
- Added unit tests
---
.../api/resources/PinotTableRestletResource.java | 3 +-
.../UpsertCompactionTaskGenerator.java | 27 ++++++++---
.../UpsertCompactionTaskGeneratorTest.java | 53 ++++++++++++++++++++++
3 files changed, 76 insertions(+), 7 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index aedc6a7ef8..c0f11a412b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -963,7 +963,7 @@ public class PinotTableRestletResource {
@ApiParam(value = "A list of segments", allowMultiple = true)
@QueryParam("segmentNames")
List<String> segmentNames,
@ApiParam(value = "Valid doc ids type")
- @QueryParam("validDocIdsType") ValidDocIdsType validDocIdsType) {
+ @QueryParam("validDocIdsType") @DefaultValue("SNAPSHOT") ValidDocIdsType
validDocIdsType) {
LOGGER.info("Received a request to fetch aggregate valid doc id metadata
for a table {}", tableName);
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == TableType.OFFLINE) {
@@ -977,6 +977,7 @@ public class PinotTableRestletResource {
try {
TableMetadataReader tableMetadataReader =
new TableMetadataReader(_executor, _connectionManager,
_pinotHelixResourceManager);
+ validDocIdsType = (validDocIdsType == null) ? ValidDocIdsType.SNAPSHOT :
validDocIdsType;
JsonNode segmentsMetadataJson =
tableMetadataReader.getAggregateValidDocIdsMetadata(tableNameWithType,
segmentNames,
validDocIdsType.toString(),
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 6d21d8417f..de24b8a5a6 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -60,9 +60,9 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
public static class SegmentSelectionResult {
- private List<SegmentZKMetadata> _segmentsForCompaction;
+ private final List<SegmentZKMetadata> _segmentsForCompaction;
- private List<String> _segmentsForDeletion;
+ private final List<String> _segmentsForDeletion;
SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction,
List<String> segmentsForDeletion) {
_segmentsForCompaction = segmentsForCompaction;
@@ -96,8 +96,17 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
String tableNameWithType = tableConfig.getTableName();
LOGGER.info("Start generating task configs for table: {}",
tableNameWithType);
+ if (tableConfig.getTaskConfig() == null) {
+ LOGGER.warn("Task config is null for table: {}", tableNameWithType);
+ continue;
+ }
+
Map<String, String> taskConfigs =
tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
- List<SegmentZKMetadata> completedSegments =
getCompletedSegments(tableNameWithType, taskConfigs);
+ List<SegmentZKMetadata> allSegments =
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+
+ // Get completed segments and filter out the segments based on the
buffer time configuration
+ List<SegmentZKMetadata> completedSegments =
+ getCompletedSegments(taskConfigs, allSegments,
System.currentTimeMillis());
if (completedSegments.isEmpty()) {
LOGGER.info("No completed segments were eligible for compaction for
table: {}", tableNameWithType);
@@ -211,6 +220,11 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
// Skip segments if the crc from zk metadata and server does not match.
They may be being reloaded.
SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
+ if (segment == null) {
+ LOGGER.warn("Segment {} is not found in the completed segments list,
skipping it for compaction", segmentName);
+ continue;
+ }
+
if (segment.getCrc() !=
Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
LOGGER.warn(
"CRC mismatch for segment: {}, skipping it for compaction
(segmentZKMetadata={}, validDocIdsMetadata={})",
@@ -229,15 +243,16 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
return new SegmentSelectionResult(segmentsForCompaction,
segmentsForDeletion);
}
- private List<SegmentZKMetadata> getCompletedSegments(String
tableNameWithType, Map<String, String> taskConfigs) {
+ @VisibleForTesting
+ public static List<SegmentZKMetadata> getCompletedSegments(Map<String,
String> taskConfigs,
+ List<SegmentZKMetadata> allSegments, long currentTimeInMillis) {
List<SegmentZKMetadata> completedSegments = new ArrayList<>();
String bufferPeriod =
taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY,
DEFAULT_BUFFER_PERIOD);
long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
- List<SegmentZKMetadata> allSegments =
getSegmentsZKMetadataForTable(tableNameWithType);
for (SegmentZKMetadata segment : allSegments) {
CommonConstants.Segment.Realtime.Status status = segment.getStatus();
// initial segments selection based on status and age
- if (status.isCompleted() && (segment.getEndTimeMs() <=
(System.currentTimeMillis() - bufferMs))) {
+ if (status.isCompleted() && (segment.getEndTimeMs() <=
(currentTimeInMillis - bufferMs))) {
completedSegments.add(segment);
}
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index d74b03e815..5aacba1f93 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -181,6 +181,52 @@ public class UpsertCompactionTaskGeneratorTest {
assertEquals(maxTasks, 10);
}
+ @Test
+ public void testGetCompletedSegments() {
+ long currentTimeInMillis = System.currentTimeMillis();
+ Map<String, String> taskConfigs = new HashMap<>();
+ taskConfigs.put(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, "1d");
+
+ SegmentZKMetadata metadata1 = new SegmentZKMetadata("testTable");
+ metadata1.setEndTime(1694198844776L);
+ metadata1.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ metadata1.setTimeUnit(TimeUnit.MILLISECONDS);
+ SegmentZKMetadata metadata2 = new SegmentZKMetadata("testTable");
+ metadata2.setEndTime(1699639830678L);
+ metadata2.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ metadata2.setTimeUnit(TimeUnit.MILLISECONDS);
+
+ SegmentZKMetadata metadata3 = new SegmentZKMetadata("testTable");
+ metadata3.setEndTime(currentTimeInMillis);
+ metadata3.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ metadata3.setTimeUnit(TimeUnit.MILLISECONDS);
+
+ List<SegmentZKMetadata> segmentZKMetadataList = new ArrayList<>();
+ segmentZKMetadataList.add(metadata1);
+ segmentZKMetadataList.add(metadata2);
+ segmentZKMetadataList.add(metadata3);
+
+ List<SegmentZKMetadata> result =
+ UpsertCompactionTaskGenerator.getCompletedSegments(taskConfigs,
segmentZKMetadataList, currentTimeInMillis);
+ Assert.assertEquals(result.size(), 2);
+
+ SegmentZKMetadata metadata4 = new SegmentZKMetadata("testTable");
+ metadata4.setEndTime(currentTimeInMillis -
TimeUtils.convertPeriodToMillis("2d") + 1);
+ metadata4.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+ metadata4.setTimeUnit(TimeUnit.MILLISECONDS);
+ segmentZKMetadataList.add(metadata4);
+
+ result =
+ UpsertCompactionTaskGenerator.getCompletedSegments(taskConfigs,
segmentZKMetadataList, currentTimeInMillis);
+ Assert.assertEquals(result.size(), 3);
+
+ // Check the boundary condition for buffer time period based filtering
+ taskConfigs.put(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, "2d");
+ result =
+ UpsertCompactionTaskGenerator.getCompletedSegments(taskConfigs,
segmentZKMetadataList, currentTimeInMillis);
+ Assert.assertEquals(result.size(), 2);
+ }
+
@Test
public void testProcessValidDocIdsMetadata()
throws IOException {
@@ -190,10 +236,17 @@ public class UpsertCompactionTaskGeneratorTest {
+ _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" :
0," + "\"totalInvalidDocs\" : 10,"
+ "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\",
" + "\"segmentCrc\" : \""
+ _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 10" + "}]";
+
List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfo =
JsonUtils.stringToObject(json, new
TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
});
+
UpsertCompactionTaskGenerator.SegmentSelectionResult
segmentSelectionResult =
+
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, new
HashMap<>(),
+ validDocIdsMetadataInfo);
+ assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 0);
+
+ segmentSelectionResult =
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs,
_completedSegmentsMap,
validDocIdsMetadataInfo);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]