This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8f117b9224b Fix
RefreshSegmentMinionClusterIntegrationTest.checkColumnAddition() (#16125)
8f117b9224b is described below
commit 8f117b9224b1a019087655d75c40e20ce3adcf46
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Jun 17 14:08:13 2025 -0600
Fix RefreshSegmentMinionClusterIntegrationTest.checkColumnAddition()
(#16125)
---
...RefreshSegmentMinionClusterIntegrationTest.java | 79 ++++++----------------
1 file changed, 20 insertions(+), 59 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
index 8f76ce7f6eb..c0921838682 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
@@ -20,12 +20,12 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import org.apache.commons.io.FileUtils;
import org.apache.helix.task.TaskState;
@@ -36,17 +36,14 @@ import
org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.segment.spi.index.StandardIndexes;
-import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -274,78 +271,40 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
@Test(priority = 4)
public void checkColumnAddition() throws Exception {
- long numTotalDocs = getCountStarResult();
Schema schema = createSchema();
- schema.addField(new MetricFieldSpec("NewAddedIntMetric",
FieldSpec.DataType.INT, 1));
- schema.addField(new MetricFieldSpec("NewAddedLongMetric",
FieldSpec.DataType.LONG, 1));
- schema.addField(new MetricFieldSpec("NewAddedFloatMetric",
FieldSpec.DataType.FLOAT));
- schema.addField(new MetricFieldSpec("NewAddedDoubleMetric",
FieldSpec.DataType.DOUBLE));
- schema.addField(new MetricFieldSpec("NewAddedBigDecimalMetric",
FieldSpec.DataType.BIG_DECIMAL));
- schema.addField(new MetricFieldSpec("NewAddedBytesMetric",
FieldSpec.DataType.BYTES));
- schema.addField(new DimensionFieldSpec("NewAddedMVIntDimension",
FieldSpec.DataType.INT, false));
- schema.addField(new DimensionFieldSpec("NewAddedMVLongDimension",
FieldSpec.DataType.LONG, false));
- schema.addField(new DimensionFieldSpec("NewAddedMVFloatDimension",
FieldSpec.DataType.FLOAT, false));
- schema.addField(new DimensionFieldSpec("NewAddedMVDoubleDimension",
FieldSpec.DataType.DOUBLE, false));
- schema.addField(new DimensionFieldSpec("NewAddedMVBooleanDimension",
FieldSpec.DataType.BOOLEAN, false));
- schema.addField(new DimensionFieldSpec("NewAddedMVTimestampDimension",
FieldSpec.DataType.TIMESTAMP, false));
- schema.addField(new DimensionFieldSpec("NewAddedMVStringDimension",
FieldSpec.DataType.STRING, false));
- schema.addField(new DimensionFieldSpec("NewAddedSVJSONDimension",
FieldSpec.DataType.JSON, true));
- schema.addField(new DimensionFieldSpec("NewAddedSVBytesDimension",
FieldSpec.DataType.BYTES, true));
- schema.addField(
- new DateTimeFieldSpec("NewAddedDerivedHoursSinceEpoch",
FieldSpec.DataType.INT, "EPOCH|HOURS", "1:DAYS"));
- schema.addField(
- new DateTimeFieldSpec("NewAddedDerivedTimestamp",
FieldSpec.DataType.TIMESTAMP, "TIMESTAMP", "1:DAYS"));
- schema.addField(new
DimensionFieldSpec("NewAddedDerivedSVBooleanDimension",
FieldSpec.DataType.BOOLEAN, true));
- schema.addField(new DimensionFieldSpec("NewAddedDerivedMVStringDimension",
FieldSpec.DataType.STRING, false));
schema.addField(new DimensionFieldSpec("NewAddedDerivedDivAirportSeqIDs",
FieldSpec.DataType.INT, false));
schema.addField(new
DimensionFieldSpec("NewAddedDerivedDivAirportSeqIDsString",
FieldSpec.DataType.STRING, false));
schema.addField(new
DimensionFieldSpec("NewAddedRawDerivedStringDimension",
FieldSpec.DataType.STRING, true));
schema.addField(new DimensionFieldSpec("NewAddedRawDerivedMVIntDimension",
FieldSpec.DataType.INT, false));
- schema.addField(new DimensionFieldSpec("NewAddedDerivedMVDoubleDimension",
FieldSpec.DataType.DOUBLE, false));
schema.addField(new DimensionFieldSpec("NewAddedDerivedNullString",
FieldSpec.DataType.STRING, true, "nil"));
schema.setEnableColumnBasedNullHandling(true);
- addSchema(schema);
+ updateSchema(schema);
TableConfig tableConfig = getOfflineTableConfig();
List<TransformConfig> transformConfigs =
- Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch",
"DaysSinceEpoch * 24"),
- new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch *
24 * 3600 * 1000"),
- new TransformConfig("NewAddedDerivedSVBooleanDimension",
"ActualElapsedTime > 0"),
- new TransformConfig("NewAddedDerivedMVStringDimension",
"split(DestCityName, ', ')"),
- new TransformConfig("NewAddedDerivedDivAirportSeqIDs",
"DivAirportSeqIDs"),
+ List.of(new TransformConfig("NewAddedDerivedDivAirportSeqIDs",
"DivAirportSeqIDs"),
new TransformConfig("NewAddedDerivedDivAirportSeqIDsString",
"DivAirportSeqIDs"),
new TransformConfig("NewAddedRawDerivedStringDimension",
"reverse(DestCityName)"),
new TransformConfig("NewAddedRawDerivedMVIntDimension",
"ActualElapsedTime"),
- new TransformConfig("NewAddedDerivedMVDoubleDimension",
"ArrDelayMinutes"),
new TransformConfig("NewAddedDerivedNullString", "caseWhen(true,
null, null)"));
-
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
tableConfig.setIngestionConfig(ingestionConfig);
-
- // Ensure that we can reload segments with a new raw derived column
-
tableConfig.getIndexingConfig().getNoDictionaryColumns().add("NewAddedRawDerivedStringDimension");
-
tableConfig.getIndexingConfig().getNoDictionaryColumns().add("NewAddedRawDerivedMVIntDimension");
- List<FieldConfig> fieldConfigList = new ArrayList<>();
- fieldConfigList.add(
- new FieldConfig("NewAddedDerivedDivAirportSeqIDs",
FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
- FieldConfig.CompressionCodec.MV_ENTRY_DICT, null));
- fieldConfigList.add(new
FieldConfig("NewAddedDerivedDivAirportSeqIDsString",
FieldConfig.EncodingType.DICTIONARY,
- Collections.emptyList(), FieldConfig.CompressionCodec.MV_ENTRY_DICT,
null));
+ List<String> noDictionaryColumns =
tableConfig.getIndexingConfig().getNoDictionaryColumns();
+ assertNotNull(noDictionaryColumns);
+ noDictionaryColumns.add("NewAddedRawDerivedStringDimension");
+ noDictionaryColumns.add("NewAddedRawDerivedMVIntDimension");
updateTableConfig(tableConfig);
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
- assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
- .setTablesToSchedule(Collections.singleton(offlineTableName)))
+ assertNotNull(_taskManager.scheduleTasks(new
TaskSchedulingContext().setTablesToSchedule(Set.of(offlineTableName)))
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
- .setTablesToSchedule(Collections.singleton(offlineTableName))
-
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
- _taskManager);
+ MinionTaskTestUtils.assertNoTaskSchedule(new
TaskSchedulingContext().setTablesToSchedule(Set.of(offlineTableName))
+
.setTasksToSchedule(Set.of(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
_taskManager);
waitForTaskToComplete();
// Check that metadata contains processed times.
@@ -356,22 +315,24 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
assertTrue(customMap.containsKey(refreshKey));
}
+ String referenceQuery = "SELECT COUNT(*) FROM mytable WHERE
ActualElapsedTime > 150";
+ int referenceCount =
postQuery(referenceQuery).get("resultTable").get("rows").get(0).get(0).asInt();
waitForServerSegmentDownload(aVoid -> {
try {
- String query = "SELECT COUNT(*) FROM mytable WHERE NewAddedIntMetric =
1";
+ String query = "SELECT COUNT(*) FROM mytable WHERE
NewAddedRawDerivedMVIntDimension > 150";
JsonNode response = postQuery(query);
- return response.get("resultTable").get("rows").get(0).get(0).asLong()
== numTotalDocs;
+ return response.get("resultTable").get("rows").get(0).get(0).asLong()
== referenceCount;
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// Verify the index sizes
- JsonNode columnIndexSizeMap = JsonUtils.stringToJsonNode(sendGetRequest(
-
_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(),
- List.of("DivAirportSeqIDs", "NewAddedDerivedDivAirportSeqIDs",
"NewAddedDerivedDivAirportSeqIDsString",
- "NewAddedRawDerivedStringDimension",
"NewAddedRawDerivedMVIntDimension",
- "NewAddedDerivedNullString"))))
+ List<String> columns =
+ List.of("DivAirportSeqIDs", "NewAddedDerivedDivAirportSeqIDs",
"NewAddedDerivedDivAirportSeqIDsString",
+ "NewAddedRawDerivedStringDimension",
"NewAddedRawDerivedMVIntDimension", "NewAddedDerivedNullString");
+ JsonNode columnIndexSizeMap = JsonUtils.stringToJsonNode(
+
sendGetRequest(_controllerRequestURLBuilder.forTableAggregateMetadata(getTableName(),
columns)))
.get("columnIndexSizeMap");
assertEquals(columnIndexSizeMap.size(), 6);
JsonNode originalColumnIndexSizes =
columnIndexSizeMap.get("DivAirportSeqIDs");
@@ -389,7 +350,7 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
assertTrue(
derivedStringColumnIndexSizes.get(StandardIndexes.DICTIONARY_ID).asDouble() >
originalColumnDictionarySize);
- // Both derived columns should have smaller forward index size than the
original column because of compression
+ // Both derived columns should have same forward index size
double derivedColumnForwardIndexSize =
derivedColumnIndexSizes.get(StandardIndexes.FORWARD_ID).asDouble();
assertEquals(derivedStringColumnIndexSizes.get(StandardIndexes.FORWARD_ID).asDouble(),
derivedColumnForwardIndexSize);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]