Copilot commented on code in PR #17308:
URL: https://github.com/apache/pinot/pull/17308#discussion_r2748958615
##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java:
##########
@@ -186,6 +254,15 @@ public PartialUpsertMerger getCustomMerger() {
};
}
+ private TableConfig createTableConfig(Schema schema, UpsertConfig
upsertConfig) {
+ String tableName = schema.getSchemaName();
+ if (tableName == null || tableName.isEmpty()) {
+ tableName = "testTable";
+ }
Review Comment:
Use `StringUtils.isEmpty()` or check for blank strings instead of separate
null and isEmpty checks for consistency with project conventions.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -58,6 +59,13 @@ public enum ConsistencyMode {
@Nullable
private Map<String, Strategy> _partialUpsertStrategies;
+ @JsonPropertyDescription("Transform configs evaluated after partial upsert
merge to populate derived columns. Lives "
+ + "under UpsertConfig because these transforms are applied on the merged
upsert view, not during ingestion. "
+ + "These transforms run during upsert merge (not query time) and derived
columns must be part of the table "
+ + "schema to be queryable.")
Review Comment:
Corrected spelling of 'recieve' to 'receive' in comment.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java:
##########
@@ -82,13 +163,15 @@ public void transform(GenericRow record) {
String column = entry.getKey();
FunctionEvaluator transformFunctionEvaluator = entry.getValue();
Object existingValue = record.getValue(column);
- if (existingValue == null) {
+ boolean shouldApplyTransform = _overwriteExistingValues || existingValue
== null || record.isNullValue(column);
+ if (shouldApplyTransform) {
try {
// Skip transformation if column value already exists
// NOTE: column value might already exist for OFFLINE data,
// For backward compatibility, The only exception here is that we
will override nested field like array,
// collection or map since they were not included in the record
transformation before.
Review Comment:
Comment states 'Skip transformation if column value already exists' but the
code now applies the transform when `_overwriteExistingValues` is true. Update
the comment to reflect this new behavior accurately.
```suggestion
// Apply transformation when allowed by overwriteExistingValues or
when the current value is null.
// NOTE: column value might already exist for OFFLINE data. For
backward compatibility, we may override
// nested fields like arrays, collections, or maps since they were
not included in the record
// transformation before.
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java:
##########
@@ -46,15 +53,89 @@ public class ExpressionTransformer implements
RecordTransformer {
private static final Logger LOGGER =
LoggerFactory.getLogger(ExpressionTransformer.class);
@VisibleForTesting
- final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators;
+ final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new
LinkedHashMap<>();
+ private final Set<String> _implicitMapTransformColumns = new HashSet<>();
private final boolean _continueOnError;
private final ThrottledLogger _throttledLogger;
+ /**
+ * If {@code true}, transform functions overwrite existing non-null values
instead of skipping them. This is enabled
+ * for post-upsert transforms where derived columns should be recomputed on
the merged row; otherwise transforms only
+ * populate missing or null-valued columns.
+ */
+ private final boolean _overwriteExistingValues;
public ExpressionTransformer(TableConfig tableConfig, Schema schema) {
- _expressionEvaluators =
ExpressionTransformerUtils.getTopologicallySortedExpressions(tableConfig,
schema);
+ this(tableConfig, schema, null, false);
+ }
+
+ public ExpressionTransformer(TableConfig tableConfig, Schema schema,
+ @Nullable List<TransformConfig> transformConfigs, boolean
overwriteExistingValues) {
+ Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+ List<TransformConfig> resolvedTransformConfigs =
+ transformConfigs != null ? transformConfigs
+ : ingestionConfig != null ? ingestionConfig.getTransformConfigs()
: null;
+ if (resolvedTransformConfigs != null) {
+ for (TransformConfig transformConfig : resolvedTransformConfigs) {
+ FunctionEvaluator previous =
expressionEvaluators.put(transformConfig.getColumnName(),
+
FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction()));
+ Preconditions.checkState(previous == null,
+ "Cannot set more than one ingestion transform function on column:
%s.", transformConfig.getColumnName());
+ }
+ }
+ if (transformConfigs == null) {
+ for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ String fieldName = fieldSpec.getName();
+ if (!fieldSpec.isVirtualColumn() &&
!expressionEvaluators.containsKey(fieldName)) {
+ FunctionEvaluator functionEvaluator =
FunctionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
+ if (functionEvaluator != null) {
+ expressionEvaluators.put(fieldName, functionEvaluator);
+ if (isImplicitMapTransform(fieldSpec)) {
+ _implicitMapTransformColumns.add(fieldName);
+ }
+ }
+ }
+ }
+ }
+
+ // Carry out DFS traversal to topologically sort column names based on
transform function dependencies. Throw
+ // exception if a cycle is discovered. When a name is first seen it is
added to discoveredNames set. When a name
+ // is completely processed (i.e the name and all of its dependencies have
been fully explored and no cycles have
+ // been seen), it gets added to the _expressionEvaluators list in
topologically sorted order. Fully explored
+ // names are removed from discoveredNames set.
+ Set<String> discoveredNames = new HashSet<>();
+ for (Map.Entry<String, FunctionEvaluator> entry :
expressionEvaluators.entrySet()) {
+ String columnName = entry.getKey();
+ if (!_expressionEvaluators.containsKey(columnName)) {
+ topologicalSort(columnName, expressionEvaluators, discoveredNames);
+ }
+ }
+
_continueOnError = ingestionConfig != null &&
ingestionConfig.isContinueOnError();
_throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
+ _overwriteExistingValues = overwriteExistingValues;
+ }
+
+ private void topologicalSort(String column, Map<String, FunctionEvaluator>
expressionEvaluators,
+ Set<String> discoveredNames) {
+ FunctionEvaluator functionEvaluator = expressionEvaluators.get(column);
+ if (functionEvaluator == null) {
+ return;
+ }
+
+ if (discoveredNames.add(column)) {
+ List<String> arguments = functionEvaluator.getArguments();
+ for (String arg : arguments) {
+ if (!_expressionEvaluators.containsKey(arg)) {
+ topologicalSort(arg, expressionEvaluators, discoveredNames);
+ }
+ }
+ _expressionEvaluators.put(column, functionEvaluator);
+ discoveredNames.remove(column);
+ } else {
+ throw new IllegalStateException(
+ "Expression cycle found for column '" + column + "' in Ingestion
Transform " + "Function definitions.");
Review Comment:
The error message has an unnecessary space before 'Function definitions' due
to string concatenation. Use String.format or combine the strings without extra
concatenation.
```suggestion
throw new IllegalStateException(String.format(
"Expression cycle found for column '%s' in Ingestion Transform
Function definitions.", column));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]