Copilot commented on code in PR #17308:
URL: https://github.com/apache/pinot/pull/17308#discussion_r2748958615


##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java:
##########
@@ -186,6 +254,15 @@ public PartialUpsertMerger getCustomMerger() {
     };
   }
 
+  private TableConfig createTableConfig(Schema schema, UpsertConfig 
upsertConfig) {
+    String tableName = schema.getSchemaName();
+    if (tableName == null || tableName.isEmpty()) {
+      tableName = "testTable";
+    }

Review Comment:
   Use `StringUtils.isEmpty()` or check for blank strings instead of separate 
null and isEmpty checks for consistency with project conventions.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -58,6 +59,13 @@ public enum ConsistencyMode {
   @Nullable
   private Map<String, Strategy> _partialUpsertStrategies;
 
+  @JsonPropertyDescription("Transform configs evaluated after partial upsert 
merge to populate derived columns. Lives "
+      + "under UpsertConfig because these transforms are applied on the merged 
upsert view, not during ingestion. "
+      + "These transforms run during upsert merge (not query time) and derived 
columns must be part of the table "
+      + "schema to be queryable.")

Review Comment:
   Corrected spelling of 'recieve' to 'receive' in comment.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java:
##########
@@ -82,13 +163,15 @@ public void transform(GenericRow record) {
       String column = entry.getKey();
       FunctionEvaluator transformFunctionEvaluator = entry.getValue();
       Object existingValue = record.getValue(column);
-      if (existingValue == null) {
+      boolean shouldApplyTransform = _overwriteExistingValues || existingValue 
== null || record.isNullValue(column);
+      if (shouldApplyTransform) {
         try {
           // Skip transformation if column value already exists
           // NOTE: column value might already exist for OFFLINE data,
           // For backward compatibility, The only exception here is that we 
will override nested field like array,
           // collection or map since they were not included in the record 
transformation before.

Review Comment:
   Comment states 'Skip transformation if column value already exists' but the 
code now applies the transform when `_overwriteExistingValues` is true. Update 
the comment to reflect this new behavior accurately.
   ```suggestion
             // Apply transformation when allowed by overwriteExistingValues or 
when the current value is null.
             // NOTE: column value might already exist for OFFLINE data. For 
backward compatibility, we may override
             // nested fields like arrays, collections, or maps since they were 
not included in the record
             // transformation before.
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java:
##########
@@ -46,15 +53,89 @@ public class ExpressionTransformer implements 
RecordTransformer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ExpressionTransformer.class);
 
   @VisibleForTesting
-  final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators;
+  final LinkedHashMap<String, FunctionEvaluator> _expressionEvaluators = new 
LinkedHashMap<>();
+  private final Set<String> _implicitMapTransformColumns = new HashSet<>();
   private final boolean _continueOnError;
   private final ThrottledLogger _throttledLogger;
+  /**
+   * If {@code true}, transform functions overwrite existing non-null values 
instead of skipping them. This is enabled
+   * for post-upsert transforms where derived columns should be recomputed on 
the merged row; otherwise transforms only
+   * populate missing or null-valued columns.
+   */
+  private final boolean _overwriteExistingValues;
 
   public ExpressionTransformer(TableConfig tableConfig, Schema schema) {
-    _expressionEvaluators = 
ExpressionTransformerUtils.getTopologicallySortedExpressions(tableConfig, 
schema);
+    this(tableConfig, schema, null, false);
+  }
+
+  public ExpressionTransformer(TableConfig tableConfig, Schema schema,
+      @Nullable List<TransformConfig> transformConfigs, boolean 
overwriteExistingValues) {
+    Map<String, FunctionEvaluator> expressionEvaluators = new HashMap<>();
     IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    List<TransformConfig> resolvedTransformConfigs =
+        transformConfigs != null ? transformConfigs
+            : ingestionConfig != null ? ingestionConfig.getTransformConfigs() 
: null;
+    if (resolvedTransformConfigs != null) {
+      for (TransformConfig transformConfig : resolvedTransformConfigs) {
+        FunctionEvaluator previous = 
expressionEvaluators.put(transformConfig.getColumnName(),
+            
FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction()));
+        Preconditions.checkState(previous == null,
+            "Cannot set more than one ingestion transform function on column: 
%s.", transformConfig.getColumnName());
+      }
+    }
+    if (transformConfigs == null) {
+      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+        String fieldName = fieldSpec.getName();
+        if (!fieldSpec.isVirtualColumn() && 
!expressionEvaluators.containsKey(fieldName)) {
+          FunctionEvaluator functionEvaluator = 
FunctionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
+          if (functionEvaluator != null) {
+            expressionEvaluators.put(fieldName, functionEvaluator);
+            if (isImplicitMapTransform(fieldSpec)) {
+              _implicitMapTransformColumns.add(fieldName);
+            }
+          }
+        }
+      }
+    }
+
+    // Carry out DFS traversal to topologically sort column names based on 
transform function dependencies. Throw
+    // exception if a cycle is discovered. When a name is first seen it is 
added to discoveredNames set. When a name
+    // is completely processed (i.e the name and all of its dependencies have 
been fully explored and no cycles have
+    // been seen), it gets added to the _expressionEvaluators list in 
topologically sorted order. Fully explored
+    // names are removed from discoveredNames set.
+    Set<String> discoveredNames = new HashSet<>();
+    for (Map.Entry<String, FunctionEvaluator> entry : 
expressionEvaluators.entrySet()) {
+      String columnName = entry.getKey();
+      if (!_expressionEvaluators.containsKey(columnName)) {
+        topologicalSort(columnName, expressionEvaluators, discoveredNames);
+      }
+    }
+
     _continueOnError = ingestionConfig != null && 
ingestionConfig.isContinueOnError();
     _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
+    _overwriteExistingValues = overwriteExistingValues;
+  }
+
+  private void topologicalSort(String column, Map<String, FunctionEvaluator> 
expressionEvaluators,
+      Set<String> discoveredNames) {
+    FunctionEvaluator functionEvaluator = expressionEvaluators.get(column);
+    if (functionEvaluator == null) {
+      return;
+    }
+
+    if (discoveredNames.add(column)) {
+      List<String> arguments = functionEvaluator.getArguments();
+      for (String arg : arguments) {
+        if (!_expressionEvaluators.containsKey(arg)) {
+          topologicalSort(arg, expressionEvaluators, discoveredNames);
+        }
+      }
+      _expressionEvaluators.put(column, functionEvaluator);
+      discoveredNames.remove(column);
+    } else {
+      throw new IllegalStateException(
+          "Expression cycle found for column '" + column + "' in Ingestion 
Transform " + "Function definitions.");

Review Comment:
   The error message has an unnecessary space before 'Function definitions' due 
to string concatenation. Use String.format or combine the strings without extra 
concatenation.
   ```suggestion
         throw new IllegalStateException(String.format(
             "Expression cycle found for column '%s' in Ingestion Transform 
Function definitions.", column));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to