twalthr commented on code in PR #26331: URL: https://github.com/apache/flink/pull/26331#discussion_r2010023689
########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java: ########## @@ -787,4 +827,87 @@ public Table select(Expression... fields) { Arrays.asList(fields), table.operationTree, overWindows)); } } + + // -------------------------------------------------------------------------------------------- + // Partitioned Table + // -------------------------------------------------------------------------------------------- + + private static final class PartitionedTableImpl implements PartitionedTable { + + private final TableImpl table; + private final List<Expression> partitionKeys; + + private PartitionedTableImpl(TableImpl table, List<Expression> partitionKeys) { + this.table = table; + this.partitionKeys = partitionKeys; + } + + @Override + public ApiExpression asArgument(String name) { + return createArgumentExpression( + createPartitionQueryOperation(), table.tableEnvironment, name); + } + + @Override + public Table process(String path, Object... arguments) { + return table.tableEnvironment.fromCall( + path, + unionTableAndArguments( + createPartitionQueryOperation(), table.tableEnvironment, arguments)); + } + + @Override + public Table process(Class<? extends UserDefinedFunction> function, Object... arguments) { + return table.tableEnvironment.fromCall( + function, + unionTableAndArguments( + createPartitionQueryOperation(), table.tableEnvironment, arguments)); + } + + private QueryOperation createPartitionQueryOperation() { + return table.operationTreeBuilder.partition(partitionKeys, table.operationTree); + } + } + + // -------------------------------------------------------------------------------------------- + // Shared methods + // -------------------------------------------------------------------------------------------- + + private TableImpl createTable(QueryOperation operation) { + return new TableImpl(tableEnvironment, operation, operationTreeBuilder, lookupResolver); + } + + private List<Expression> preprocessExpressions(List<Expression> expressions) { + return preprocessExpressions(expressions.toArray(new Expression[0])); + } + + private List<Expression> preprocessExpressions(Expression[] expressions) { + return Arrays.stream(expressions) + .map(f -> f.accept(lookupResolver)) + .collect(Collectors.toList()); + } + + private static Object[] unionTableAndArguments( + QueryOperation queryOperation, TableEnvironment env, Object... arguments) { + return Stream.concat( + Stream.of(ApiExpressionUtils.tableRef("ptf_arg", queryOperation, env)), + Stream.of(arguments)) + .toArray(); + } + + private static ApiExpression createArgumentExpression( + QueryOperation queryOperation, TableEnvironment env, String name) { + return new ApiExpression( + ApiExpressionUtils.unresolvedCall( + BuiltInFunctionDefinitions.ASSIGNMENT, + lit(name), + ApiExpressionUtils.tableRef(name, queryOperation, env))); + } + + private void checkCommonTableEnvironment(Table right) { + if (((TableImpl) right).getTableEnvironment() != tableEnvironment) { + throw new ValidationException( + "Only tables from the same TableEnvironment can be joined."); + } + } Review Comment: This should not be a concern. I'm not aware that anyone implements custom `Table`s. Why should people implement their own Flink API. We can still relax this if it causes issues in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org