twalthr commented on code in PR #23975:
URL: https://github.com/apache/flink/pull/23975#discussion_r1449130065


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java:
##########
@@ -67,6 +67,8 @@ public final class UserDefinedFunctionHelper {
 
     public static final String SCALAR_EVAL = "eval";
 
+    public static final String ASYNC_SCALAR_EVAL = "eval";
+
     public static final String TABLE_EVAL = "eval";

Review Comment:
   That is definitely the best location as it is called right after 
registration in TableEnvironment. An additional check should be done after 
specialization in code generation.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java:
##########
@@ -348,6 +350,53 @@ public static Optional<Class<?>> extractSimpleGeneric(
         }
     }
 
+    /** Resolves a variable type while accepting a context for resolution. */
+    public static Type resolveVariableWithClassContext(@Nullable Type 
contextType, Type type) {
+        final List<Type> typeHierarchy;
+        if (contextType != null) {
+            typeHierarchy = collectTypeHierarchy(contextType);
+        } else {
+            typeHierarchy = Collections.emptyList();
+        }
+        if (!containsTypeVariable(type)) {

Review Comment:
   not sure if this was intended but with this check you exclude the 
`WildcardType`:
   ```
   eval(CompletableFuture<? extends Integer> s)
   ```
   which is correct because the if clause further down could not handle it, I 
would at least add a comment to the false in `containsTypeVariable`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java:
##########
@@ -521,6 +529,49 @@ private static void validateImplementationMethod(
         }
     }
 
+    private static void validateAsyncImplementationMethod(
+            Class<? extends UserDefinedFunction> clazz, String... 
methodNameOptions) {
+        final Set<String> nameSet = new 
HashSet<>(Arrays.asList(methodNameOptions));
+        final List<Method> methods = getAllDeclaredMethods(clazz);
+        for (Method method : methods) {
+            if (!nameSet.contains(method.getName())) {
+                continue;
+            }
+            if (!method.getReturnType().equals(Void.TYPE)) {
+                throw new ValidationException(
+                        String.format(
+                                "Method '%s' of function class '%s' must be 
void.",
+                                method.getName(), clazz.getName()));
+            }
+            boolean foundParam = false;
+            boolean genericParam = false;
+            if (method.getParameterCount() >= 1) {
+                Type firstParam = method.getGenericParameterTypes()[0];
+                firstParam = 
ExtractionUtils.resolveVariableWithClassContext(clazz, firstParam);
+                if (CompletableFuture.class.equals(firstParam)) {
+                    foundParam = true;
+                } else if (firstParam instanceof ParameterizedType
+                        && CompletableFuture.class.equals(
+                                ((ParameterizedType) 
firstParam).getRawType())) {
+                    foundParam = true;
+                    genericParam = true;
+                }
+            }
+            if (!foundParam) {
+                throw new ValidationException(
+                        String.format(
+                                "Method '%s' of function class '%s' must have 
a first argument of type java.util.concurrent.CompletableFuture.",
+                                method.getName(), clazz.getName()));
+            }
+            if (!genericParam) {

Review Comment:
   this we shouldn't validate because if someone uses a type inference with 
multiple types (i.e. no FunctionMappingExtractor), they might use raw types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to