BruceKellan commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1129228090


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ExpressionUtils.java:
##########
@@ -177,4 +183,113 @@ public static Object 
getValueFromLiteral(ValueLiteralExpression expr) {
         throw new UnsupportedOperationException("Unsupported type: " + 
logicalType);
     }
   }
+
+  public static List<ResolvedExpression> 
filterSimpleCallExpression(List<ResolvedExpression> exprs) {
+    return exprs.stream()
+        .filter(ExpressionUtils::isSimpleCallExpression)
+        .collect(Collectors.toList());
+  }
+
+  public static Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> 
extractPartitionPredicateList(
+      List<ResolvedExpression> exprs,
+      List<String> partitionKeys,
+      RowType tableRowType) {
+    if (partitionKeys.isEmpty()) {
+      return Tuple2.of(exprs, Collections.emptyList());
+    } else {
+      List<ResolvedExpression> partitionFilters = new ArrayList<>();
+      List<ResolvedExpression> nonPartitionFilters = new ArrayList<>();
+      int[] partitionIdxMapping = 
tableRowType.getFieldNames().stream().mapToInt(partitionKeys::indexOf).toArray();
+      for (ResolvedExpression expr : exprs) {
+        for (CallExpression e : splitByAnd(expr)) {
+          CallExpression convertedExpr = applyMapping(e, partitionIdxMapping);
+          if (convertedExpr != null) {
+            partitionFilters.add(convertedExpr);
+          } else {
+            nonPartitionFilters.add(e);
+          }
+        }
+      }
+      return Tuple2.of(nonPartitionFilters, partitionFilters);
+    }
+  }
+
+  private static List<CallExpression> splitByAnd(ResolvedExpression expr) {
+    List<CallExpression> result = new ArrayList<>();
+    splitByAnd(expr, result);
+    return result;
+  }
+
+  private static void splitByAnd(
+      ResolvedExpression expr,
+      List<CallExpression> result) {
+    if (!(expr instanceof CallExpression)) {
+      return;
+    }
+    CallExpression callExpr = (CallExpression) expr;
+    FunctionDefinition funcDef = callExpr.getFunctionDefinition();
+
+    if (funcDef == BuiltInFunctionDefinitions.AND) {
+      callExpr.getChildren().stream()
+          .filter(child -> child instanceof CallExpression)
+          .forEach(child -> splitByAnd((CallExpression) child, result));
+    } else {
+      result.add(callExpr);
+    }
+  }
+
+  private static CallExpression applyMapping(CallExpression expr, int[] 
fieldIdxMapping) {
+    FunctionDefinition funcDef = expr.getFunctionDefinition();
+    if (funcDef == BuiltInFunctionDefinitions.IN
+        || funcDef == BuiltInFunctionDefinitions.EQUALS
+        || funcDef == BuiltInFunctionDefinitions.NOT_EQUALS
+        || funcDef == BuiltInFunctionDefinitions.IS_NULL
+        || funcDef == BuiltInFunctionDefinitions.IS_NOT_NULL
+        || funcDef == BuiltInFunctionDefinitions.LESS_THAN
+        || funcDef == BuiltInFunctionDefinitions.GREATER_THAN
+        || funcDef == BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL
+        || funcDef == BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL) {
+      List<Expression> children = expr.getChildren();
+      List<ResolvedExpression> newChildren = children.stream()
+          .map(
+              child -> {
+                if (child instanceof FieldReferenceExpression) {
+                  FieldReferenceExpression refExpr = 
(FieldReferenceExpression) child;
+                  int target = fieldIdxMapping[refExpr.getFieldIndex()];
+                  if (target >= 0) {
+                    return new FieldReferenceExpression(
+                        refExpr.getName(),
+                        refExpr.getOutputDataType(),
+                        refExpr.getInputIndex(),
+                        target);
+                  } else {
+                    return null;
+                  }
+                } else {
+                  return (ResolvedExpression) child;
+                }
+              })
+          .filter(Objects::nonNull)
+          .collect(Collectors.toList());
+      if (newChildren.size() == children.size()) {
+        return expr.replaceArgs(newChildren, expr.getOutputDataType());

Review Comment:
   `replaceArgs` does not exist in flink1.13



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to