AlanConfluent commented on code in PR #23975: URL: https://github.com/apache/flink/pull/23975#discussion_r1451023480
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/ExtractionUtils.java: ########## @@ -348,6 +350,53 @@ public static Optional<Class<?>> extractSimpleGeneric( } } + /** Resolves a variable type while accepting a context for resolution. */ + public static Type resolveVariableWithClassContext(@Nullable Type contextType, Type type) { + final List<Type> typeHierarchy; + if (contextType != null) { + typeHierarchy = collectTypeHierarchy(contextType); + } else { + typeHierarchy = Collections.emptyList(); + } + if (!containsTypeVariable(type)) { Review Comment: I did exclude that intentionally. I don't think it would be easy to call back on the future with a concrete type even if such a `CompletableFuture<? extends Number>` were a parameter since I think this is a compile time error. I tried some examples of weird method overriding to allow calling back on that future and got one that worked, but it's awkward (and throws a reasonable error when you run it anyhow). Will add a comment in containsTypeVariable about keeping in sync with this method. ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java: ########## @@ -521,6 +529,49 @@ private static void validateImplementationMethod( } } + private static void validateAsyncImplementationMethod( + Class<? extends UserDefinedFunction> clazz, String... methodNameOptions) { + final Set<String> nameSet = new HashSet<>(Arrays.asList(methodNameOptions)); + final List<Method> methods = getAllDeclaredMethods(clazz); + for (Method method : methods) { + if (!nameSet.contains(method.getName())) { + continue; + } + if (!method.getReturnType().equals(Void.TYPE)) { + throw new ValidationException( + String.format( + "Method '%s' of function class '%s' must be void.", + method.getName(), clazz.getName())); + } + boolean foundParam = false; + boolean genericParam = false; + if (method.getParameterCount() >= 1) { + Type firstParam = method.getGenericParameterTypes()[0]; + firstParam = ExtractionUtils.resolveVariableWithClassContext(clazz, firstParam); + if (CompletableFuture.class.equals(firstParam)) { + foundParam = true; + } else if (firstParam instanceof ParameterizedType + && CompletableFuture.class.equals( + ((ParameterizedType) firstParam).getRawType())) { + foundParam = true; + genericParam = true; + } + } + if (!foundParam) { + throw new ValidationException( + String.format( + "Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture.", + method.getName(), clazz.getName())); + } + if (!genericParam) { Review Comment: Ah, that makes sense. Ok removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org