twalthr commented on code in PR #26331:
URL: https://github.com/apache/flink/pull/26331#discussion_r2010023689


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java:
##########
@@ -787,4 +827,87 @@ public Table select(Expression... fields) {
                             Arrays.asList(fields), table.operationTree, 
overWindows));
         }
     }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Partitioned Table
+    // 
--------------------------------------------------------------------------------------------
+
+    private static final class PartitionedTableImpl implements 
PartitionedTable {
+
+        private final TableImpl table;
+        private final List<Expression> partitionKeys;
+
+        private PartitionedTableImpl(TableImpl table, List<Expression> 
partitionKeys) {
+            this.table = table;
+            this.partitionKeys = partitionKeys;
+        }
+
+        @Override
+        public ApiExpression asArgument(String name) {
+            return createArgumentExpression(
+                    createPartitionQueryOperation(), table.tableEnvironment, 
name);
+        }
+
+        @Override
+        public Table process(String path, Object... arguments) {
+            return table.tableEnvironment.fromCall(
+                    path,
+                    unionTableAndArguments(
+                            createPartitionQueryOperation(), 
table.tableEnvironment, arguments));
+        }
+
+        @Override
+        public Table process(Class<? extends UserDefinedFunction> function, 
Object... arguments) {
+            return table.tableEnvironment.fromCall(
+                    function,
+                    unionTableAndArguments(
+                            createPartitionQueryOperation(), 
table.tableEnvironment, arguments));
+        }
+
+        private QueryOperation createPartitionQueryOperation() {
+            return table.operationTreeBuilder.partition(partitionKeys, 
table.operationTree);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Shared methods
+    // 
--------------------------------------------------------------------------------------------
+
+    private TableImpl createTable(QueryOperation operation) {
+        return new TableImpl(tableEnvironment, operation, 
operationTreeBuilder, lookupResolver);
+    }
+
+    private List<Expression> preprocessExpressions(List<Expression> 
expressions) {
+        return preprocessExpressions(expressions.toArray(new Expression[0]));
+    }
+
+    private List<Expression> preprocessExpressions(Expression[] expressions) {
+        return Arrays.stream(expressions)
+                .map(f -> f.accept(lookupResolver))
+                .collect(Collectors.toList());
+    }
+
+    private static Object[] unionTableAndArguments(
+            QueryOperation queryOperation, TableEnvironment env, Object... 
arguments) {
+        return Stream.concat(
+                        Stream.of(ApiExpressionUtils.tableRef("ptf_arg", 
queryOperation, env)),
+                        Stream.of(arguments))
+                .toArray();
+    }
+
+    private static ApiExpression createArgumentExpression(
+            QueryOperation queryOperation, TableEnvironment env, String name) {
+        return new ApiExpression(
+                ApiExpressionUtils.unresolvedCall(
+                        BuiltInFunctionDefinitions.ASSIGNMENT,
+                        lit(name),
+                        ApiExpressionUtils.tableRef(name, queryOperation, 
env)));
+    }
+
+    private void checkCommonTableEnvironment(Table right) {
+        if (((TableImpl) right).getTableEnvironment() != tableEnvironment) {
+            throw new ValidationException(
+                    "Only tables from the same TableEnvironment can be 
joined.");
+        }
+    }

Review Comment:
   This should not be a concern. I'm not aware that anyone implements custom 
`Table`s. Why should people implement their own Flink API. We can still relax 
this if it causes issues in the future. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to