This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fd448f44266 Misc fixes for virtual column support (#16121)
fd448f44266 is described below
commit fd448f44266cfbef87e545c9933c120a74f21d16
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Jun 17 14:16:56 2025 -0600
Misc fixes for virtual column support (#16121)
---
.../helix/core/PinotHelixResourceManager.java | 56 ++++++++++++----------
.../core/data/manager/BaseTableDataManager.java | 5 +-
.../core/operator/filter/MapFilterOperator.java | 14 ++++--
.../apache/pinot/core/startree/StarTreeUtils.java | 24 ++++++++--
.../tests/OfflineClusterIntegrationTest.java | 25 ++++++++++
.../segment/readers/PinotSegmentColumnReader.java | 1 -
6 files changed, 86 insertions(+), 39 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d377ed190f8..1d01e23525b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1569,11 +1569,9 @@ public class PinotHelixResourceManager {
reloadAllSegments(tableNameWithType, false, null);
}
} else {
- // Send schema refresh message to all tables that use this schema
+ LOGGER.info("Refreshing schema for tables with name: {}", schemaName);
for (String tableNameWithType : tableNamesWithType) {
- LOGGER.info("Sending updated schema message for table: {}",
tableNameWithType);
- sendTableConfigSchemaRefreshMessage(tableNameWithType,
getServerInstancesForTable(tableNameWithType,
- TableNameBuilder.getTableTypeFromTableName(tableNameWithType)));
+ sendTableConfigSchemaRefreshMessage(tableNameWithType);
}
}
} catch (TableNotFoundException e) {
@@ -3155,6 +3153,7 @@ public class PinotHelixResourceManager {
}
}
+ /// Sends table config refresh message to brokers.
private void sendTableConfigRefreshMessage(String tableNameWithType) {
TableConfigRefreshMessage tableConfigRefreshMessage = new
TableConfigRefreshMessage(tableNameWithType);
@@ -3176,6 +3175,32 @@ public class PinotHelixResourceManager {
}
}
+ /// Sends table config and schema refresh message to servers.
+ private void sendTableConfigSchemaRefreshMessage(String tableNameWithType) {
+ TableConfigSchemaRefreshMessage refreshMessage = new
TableConfigSchemaRefreshMessage(tableNameWithType);
+
+ // Send table config and schema refresh message to servers
+ Criteria recipientCriteria = new Criteria();
+ recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ recipientCriteria.setInstanceName("%");
+ recipientCriteria.setResource(tableNameWithType);
+ recipientCriteria.setSessionSpecific(true);
+
+ // Send message with no callback and infinite timeout on the recipient
+ try {
+ int numMessagesSent =
_helixZkManager.getMessagingService().send(recipientCriteria, refreshMessage,
null, -1);
+ if (numMessagesSent > 0) {
+ LOGGER.info("Sent {} table config and schema refresh messages for
table: {}", numMessagesSent,
+ tableNameWithType);
+ } else {
+ LOGGER.warn("No table config and schema refresh message sent for
table: {}", tableNameWithType);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while sending table config and schema
refresh message for table: {}",
+ tableNameWithType, e);
+ }
+ }
+
private void sendLogicalTableConfigRefreshMessage(String logicalTableName) {
LogicalTableConfigRefreshMessage refreshMessage = new
LogicalTableConfigRefreshMessage(logicalTableName);
@@ -3239,7 +3264,7 @@ public class PinotHelixResourceManager {
private void sendRoutingTableRebuildMessage(String tableNameWithType) {
RoutingTableRebuildMessage routingTableRebuildMessage = new
RoutingTableRebuildMessage(tableNameWithType);
- // Send table config refresh message to brokers
+ // Send routing table rebuild message to brokers
Criteria recipientCriteria = new Criteria();
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
@@ -3258,27 +3283,6 @@ public class PinotHelixResourceManager {
}
}
- private void sendTableConfigSchemaRefreshMessage(String tableNameWithType,
List<String> instances) {
- TableConfigSchemaRefreshMessage refreshMessage = new
TableConfigSchemaRefreshMessage(tableNameWithType);
- for (String instance : instances) {
- // Send refresh message to servers
- Criteria recipientCriteria = new Criteria();
- recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
- recipientCriteria.setInstanceName(instance);
- recipientCriteria.setSessionSpecific(true);
- ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
- // Send message with no callback and infinite timeout on the recipient
- int numMessagesSent = messagingService.send(recipientCriteria,
refreshMessage, null, -1);
- if (numMessagesSent > 0) {
- LOGGER.info("Sent {} schema refresh messages to servers for table: {}
for instance: {}", numMessagesSent,
- tableNameWithType, instance);
- } else {
- LOGGER.warn("No schema refresh message sent to servers for table: {}
for instance: {}", tableNameWithType,
- instance);
- }
- }
- }
-
/**
* Update the instance config given the broker instance id
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 6796b9bd02d..dd35dcf7c10 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -1382,10 +1382,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
for (String columnName : segmentPhysicalColumns) {
ColumnMetadata columnMetadata =
segmentMetadata.getColumnMetadataFor(columnName);
- FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
DataSource source = segment.getDataSource(columnName);
- Preconditions.checkNotNull(columnMetadata);
- Preconditions.checkNotNull(source);
+ assert columnMetadata != null && source != null;
+ FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
// Column is deleted
if (fieldSpecInSchema == null) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
index ee413306b3b..bc4b19e52b5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
@@ -65,10 +65,14 @@ public class MapFilterOperator extends BaseFilterOperator {
_columnName = arguments.get(0).getIdentifier();
_keyName = arguments.get(1).getLiteral().getStringValue();
- // Get JSON index and create operator
- DataSource dataSource = indexSegment.getDataSource(_columnName);
- JsonIndexReader jsonIndex = dataSource.getJsonIndex();
- if (jsonIndex != null && useJsonIndex(_predicate.getType())) {
+ JsonIndexReader jsonIndex = null;
+ if (canUseJsonIndex(_predicate.getType())) {
+ DataSource dataSource = indexSegment.getDataSourceNullable(_columnName);
+ if (dataSource != null) {
+ jsonIndex = dataSource.getJsonIndex();
+ }
+ }
+ if (jsonIndex != null) {
FilterContext filterContext = createFilterContext();
_jsonMatchOperator = new JsonMatchFilterOperator(jsonIndex,
filterContext, numDocs);
_expressionFilterOperator = null;
@@ -201,7 +205,7 @@ public class MapFilterOperator extends BaseFilterOperator {
* @param predicateType The type of predicate
* @return true if the predicate type is supported for JSON index, false
otherwise
*/
- private boolean useJsonIndex(Predicate.Type predicateType) {
+ private static boolean canUseJsonIndex(Predicate.Type predicateType) {
switch (predicateType) {
case EQ:
case NOT_EQ:
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
index d5180285955..c422a64a7ee 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
@@ -324,7 +324,11 @@ public class StarTreeUtils {
return null;
}
String column = lhs.getIdentifier();
- DataSource dataSource = indexSegment.getDataSource(column);
+ DataSource dataSource = indexSegment.getDataSourceNullable(column);
+ if (dataSource == null) {
+ // Star-tree does not support non-existent column
+ return null;
+ }
Dictionary dictionary = dataSource.getDictionary();
if (dictionary == null) {
// Star-tree does not support non-dictionary encoded dimension
@@ -388,7 +392,11 @@ public class StarTreeUtils {
}
String column = aggregationFunctionColumnPair.getColumn();
- DataSource dataSource = indexSegment.getDataSource(column);
+ DataSource dataSource = indexSegment.getDataSourceNullable(column);
+ if (dataSource == null) {
+ LOGGER.debug("Cannot use star-tree index because aggregation column:
'{}' does not exist", column);
+ return null;
+ }
if (dataSource.getNullValueVector() != null &&
!dataSource.getNullValueVector().getNullBitmap().isEmpty()) {
LOGGER.debug("Cannot use star-tree index because aggregation column:
'{}' has null values", column);
return null;
@@ -396,7 +404,11 @@ public class StarTreeUtils {
}
for (String column : predicateEvaluatorsMap.keySet()) {
- DataSource dataSource = indexSegment.getDataSource(column);
+ DataSource dataSource = indexSegment.getDataSourceNullable(column);
+ if (dataSource == null) {
+ LOGGER.debug("Cannot use star-tree index because filter column: '{}'
does not exist", column);
+ return null;
+ }
if (dataSource.getNullValueVector() != null &&
!dataSource.getNullValueVector().getNullBitmap().isEmpty()) {
LOGGER.debug("Cannot use star-tree index because filter column: '{}'
has null values", column);
return null;
@@ -410,7 +422,11 @@ public class StarTreeUtils {
}
}
for (String column : groupByColumns) {
- DataSource dataSource = indexSegment.getDataSource(column);
+ DataSource dataSource = indexSegment.getDataSourceNullable(column);
+ if (dataSource == null) {
+ LOGGER.debug("Cannot use star-tree index because group-by column:
'{}' does not exist", column);
+ return null;
+ }
if (dataSource.getNullValueVector() != null &&
!dataSource.getNullValueVector().getNullBitmap().isEmpty()) {
LOGGER.debug("Cannot use star-tree index because group-by column:
'{}' has null values", column);
return null;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 7ce615a3b15..f40bfc8a5aa 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1701,6 +1701,19 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
ingestionConfig.setTransformConfigs(transformConfigs);
tableConfig.setIngestionConfig(ingestionConfig);
updateTableConfig(tableConfig);
+
+ // Query the new added columns without reload
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode response = postQuery(TEST_STAR_TREE_QUERY_3);
+ return response.get("resultTable").get("rows").get(0).get(0).asInt()
== 0;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 60_000L, "Failed to query new added columns without reload");
+ // Table size shouldn't change without reload
+ assertEquals(getTableSize(getTableName()), _tableSize);
+
reloadAllSegments(TEST_STAR_TREE_QUERY_3, false, numTotalDocs);
int thirdQueryResult =
postQuery(TEST_STAR_TREE_QUERY_3_REFERENCE).get("resultTable").get("rows").get(0).get(0).asInt();
@@ -1869,6 +1882,18 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
CompressionCodec.MV_ENTRY_DICT, null));
updateTableConfig(tableConfig);
+ // Query the new added columns without reload
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode response = postQuery(SELECT_STAR_QUERY);
+ return
response.get("resultTable").get("dataSchema").get("columnNames").size() == 104;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 60_000L, "Failed to query new added columns without reload");
+ // Table size shouldn't change without reload
+ assertEquals(getTableSize(getTableName()), _tableSize);
+
// Trigger reload and verify column count
reloadAllSegments(TEST_EXTRA_COLUMNS_QUERY, false, numTotalDocs);
JsonNode segmentsMetadata = JsonUtils.stringToJsonNode(
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index 01257777777..4a691e4fc42 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -42,7 +42,6 @@ public class PinotSegmentColumnReader implements Closeable {
public PinotSegmentColumnReader(IndexSegment indexSegment, String column) {
DataSource dataSource = indexSegment.getDataSource(column);
- Preconditions.checkArgument(dataSource != null, "Failed to find data
source for column: %s", column);
_forwardIndexReader = dataSource.getForwardIndex();
Preconditions.checkArgument(_forwardIndexReader != null, "Forward index
disabled for column: %s", column);
_forwardIndexReaderContext = _forwardIndexReader.createContext();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]