godfreyhe commented on a change in pull request #12851:
URL: https://github.com/apache/flink/pull/12851#discussion_r452219045



##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -449,6 +488,127 @@ public void applyProjection(int[][] projectedFields) {
                        this.projectedFields = 
Arrays.stream(projectedFields).mapToInt(f -> f[0]).toArray();
                }
 
+               @Override
+               public Result applyFilters(List<ResolvedExpression> filters) {
+                       List<ResolvedExpression> acceptedFilters = new 
ArrayList<>();
+                       List<ResolvedExpression> remainingFilters = new 
ArrayList<>();
+                       for (ResolvedExpression expr : filters) {
+                               if (shouldPushDown(expr)) {
+                                       acceptedFilters.add(expr);
+                               } else {
+                                       remainingFilters.add(expr);
+                               }
+                       }
+                       this.filterPredicates = acceptedFilters;
+                       return Result.of(acceptedFilters, remainingFilters);
+               }
+
+               private Boolean shouldPushDown(Expression expr) {
+                       if (expr instanceof CallExpression && 
expr.getChildren().size() == 2) {
+                               return 
shouldPushDownUnaryExpression(expr.getChildren().get(0))
+                                       && 
shouldPushDownUnaryExpression(expr.getChildren().get(1));
+                       }
+                       return false;
+               }
+
+               private boolean shouldPushDownUnaryExpression(Expression expr) {
+                       if (expr instanceof FieldReferenceExpression) {
+                               if 
(filterableFields.contains(((FieldReferenceExpression) expr).getName())) {
+                                       return true;
+                               }
+                       }
+
+                       if (expr instanceof ValueLiteralExpression) {
+                               return true;
+                       }
+
+                       if (expr instanceof CallExpression && 
expr.getChildren().size() == 1) {
+                               if (((CallExpression) 
expr).getFunctionDefinition().equals(UPPER)
+                                       || ((CallExpression) 
expr).getFunctionDefinition().equals(BuiltInFunctionDefinitions.LOWER)) {
+                                       return 
shouldPushDownUnaryExpression(expr.getChildren().get(0));
+                               }
+                       }
+                       // other resolved expressions return false
+                       return false;
+               }
+
+               private Boolean applyPredicatesToRow(Row row) {
+                       if (filterPredicates == null) {
+                               return true;
+                       }
+                       for (ResolvedExpression expr : filterPredicates) {
+                               if (expr instanceof CallExpression && 
expr.getChildren().size() == 2) {
+                                       if 
(!binaryFilterApplies((CallExpression) expr, row)) {
+                                               return false;
+                                       }
+                               } else {
+                                       throw new RuntimeException(expr + " not 
supported!");
+                               }
+                       }
+                       return true;
+               }
+
+               private boolean binaryFilterApplies(CallExpression binExpr, Row 
row) {
+                       List<Expression> children = binExpr.getChildren();
+                       Preconditions.checkArgument(children.size() == 2);
+                       Tuple2<Comparable, Comparable> tuple2 = 
extractValues(binExpr, row);

Review comment:
       we should limit the supported type of filter fields in `applyFilters` 
method, otherwise we can't extract the value to `Comparable` directly. Some 
types are not `Comparable`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to