twalthr commented on a change in pull request #8929: [FLINK-13028][table] Move 
expression resolver to flink-table-api-java
URL: https://github.com/apache/flink/pull/8929#discussion_r299323172
 
 

 ##########
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java
 ##########
 @@ -38,75 +40,103 @@
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.valueLiteral;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
+
 /**
  * This rule checks if a {@link UnresolvedCallExpression} can work with the 
given arguments and infers
  * the output data type. All function calls are resolved {@link 
CallExpression} after applying this
- * rule except for the special case of {@link 
BuiltInFunctionDefinitions#FLATTEN}.
+ * rule.
+ *
+ * <p>This rule also resolves {@code flatten()} calls on composite types.
  *
  * <p>If the call expects different types of arguments, but the given 
arguments have types that can
  * be casted, a {@link BuiltInFunctionDefinitions#CAST} expression is inserted.
- *
- * @see ResolveFlattenCallRule
  */
 @Internal
 final class ResolveCallByArgumentsRule implements ResolverRule {
 
        @Override
        public List<Expression> apply(List<Expression> expression, 
ResolutionContext context) {
                return expression.stream()
-                       .map(expr -> expr.accept(new 
CallArgumentsCastingVisitor(context)))
+                       .flatMap(expr -> expr.accept(new 
ResolvingCallVisitor(context)).stream())
                        .collect(Collectors.toList());
        }
 
-       private class CallArgumentsCastingVisitor extends 
RuleExpressionVisitor<Expression> {
+       // 
--------------------------------------------------------------------------------------------
+
+       private class ResolvingCallVisitor extends 
RuleExpressionVisitor<List<ResolvedExpression>> {
 
-               CallArgumentsCastingVisitor(ResolutionContext context) {
+               ResolvingCallVisitor(ResolutionContext context) {
                        super(context);
                }
 
                @Override
-               public Expression visit(UnresolvedCallExpression 
unresolvedCall) {
+               public List<ResolvedExpression> visit(UnresolvedCallExpression 
unresolvedCall) {
 
                        final List<ResolvedExpression> resolvedArgs = 
unresolvedCall.getChildren().stream()
-                               .map(c -> c.accept(this))
-                               .map(e -> {
-                                       // special case: FLATTEN
-                                       // a call chain 
`myFunc().flatten().flatten()` is not allowed
-                                       if (e instanceof 
UnresolvedCallExpression &&
-                                                       
((UnresolvedCallExpression) e).getFunctionDefinition() == 
BuiltInFunctionDefinitions.FLATTEN) {
-                                               throw new 
ValidationException("Consecutive flattening calls are not allowed.");
-                                       }
-                                       if (e instanceof ResolvedExpression) {
-                                               return (ResolvedExpression) e;
-                                       }
-                                       throw new TableException("Unexpected 
unresolved expression: " + e);
-                               })
+                               .flatMap(c -> c.accept(this).stream())
                                .collect(Collectors.toList());
 
-                       // FLATTEN is a special case and the only call that 
remains unresolved after this rule
-                       // it will be resolved by ResolveFlattenCallRule
                        if (unresolvedCall.getFunctionDefinition() == 
BuiltInFunctionDefinitions.FLATTEN) {
-                               return unresolvedCall.replaceArgs(new 
ArrayList<>(resolvedArgs));
+                               return executeFlatten(resolvedArgs);
                        }
 
                        if (unresolvedCall.getFunctionDefinition() instanceof 
BuiltInFunctionDefinition) {
                                final BuiltInFunctionDefinition definition =
                                        (BuiltInFunctionDefinition) 
unresolvedCall.getFunctionDefinition();
 
                                if 
(definition.getTypeInference().getOutputTypeStrategy() != 
TypeStrategies.MISSING) {
-                                       return runTypeInference(
-                                               unresolvedCall,
-                                               definition.getTypeInference(),
-                                               resolvedArgs);
+                                       return Collections.singletonList(
+                                               runTypeInference(
+                                                       unresolvedCall,
+                                                       
definition.getTypeInference(),
+                                                       resolvedArgs));
                                }
                        }
-                       return runLegacyTypeInference(unresolvedCall, 
resolvedArgs);
+                       return Collections.singletonList(
+                               runLegacyTypeInference(unresolvedCall, 
resolvedArgs));
+               }
+
+               @Override
+               protected List<ResolvedExpression> defaultMethod(Expression 
expression) {
+                       if (expression instanceof ResolvedExpression) {
+                               return 
Collections.singletonList((ResolvedExpression) expression);
+                       }
+                       throw new TableException("Unexpected unresolved 
expression: " + expression);
+               }
+
+               private List<ResolvedExpression> 
executeFlatten(List<ResolvedExpression> args) {
+                       if (args.size() != 1) {
+                               throw new ValidationException("Invalid number 
of arguments for flattening.");
+                       }
+                       final ResolvedExpression composite = args.get(0);
+                       final TypeInformation<?> resultType = 
fromDataTypeToLegacyInfo(composite.getOutputDataType());
 
 Review comment:
   Structured types cannot be created yet. For row types it should work.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to