SteveStevenpoor commented on code in PR #27166:
URL: https://github.com/apache/flink/pull/27166#discussion_r2502440536
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java:
##########
@@ -684,4 +695,705 @@ void testMultiSinkOnMultiJoinedView() {
false,
false);
}
+
+ /*
+ * Calcite adds a LogicalProject to compute expressions such as UPPER and
FLOOR
+ * on the necessary fields. As a result, the planner cannot fuse all joins
into
+ * a single MultiJoin node initially.
+ */
+ @Test
+ @Tag("expected-multijoin-chain")
+ void testFourWayJoinNoCommonJoinKeyWithFunctionInCondition() {
+ util.verifyRelPlan(
+ "SELECT u.user_id, u.name, o.order_id, p.payment_id,
s.location "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON o.user_id = u.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + " AND UPPER(u.name) = UPPER(p.payment_id) "
+ + " AND (FLOOR(u.cash) >= FLOOR(p.price) OR
p.price < 0) "
+ + "LEFT JOIN Shipments s ON p.payment_id = s.location
");
+ }
+
+ /*
+ * We expect the join inputs to **not** merge into a single MultiJoin node
in this case,
+ * because `documents.common_id` is different from
`other_documents.common_id`.
+ */
+ @Test
+ @Tag("no-common-join-key")
+ void testComplexCommonJoinKeyMissingProjection() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT *\n"
+ + " FROM Assignments assignments\n"
+ + " LEFT JOIN Documents AS documents\n"
+ + " ON assignments.detail_id =
documents.detail_id\n"
+ + " AND assignments.common_id =
documents.common_id\n"
+ + " LEFT JOIN Documents AS other_documents\n"
+ + " ON assignments.user_id =
other_documents.common_id\n");
+ }
+
+ @Test
+ void testComplexCommonJoinKey() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Customers ("
+ + " user_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " name STRING,"
+ + " depart_num STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE PhaseDetails ("
+ + " phase_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Organizations ("
+ + " org_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " org_name STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyExecPlan(
+ "SELECT *\n"
+ + " FROM Assignments assignments\n"
+ + " LEFT JOIN Customers AS customer\n"
+ + " ON assignments.user_id = customer.user_id\n"
+ + " AND assignments.common_id =
customer.common_id\n"
+ + " LEFT JOIN Documents AS documents\n"
+ + " ON assignments.detail_id =
documents.detail_id\n"
+ + " AND assignments.common_id =
documents.common_id\n"
+ + " LEFT JOIN PhaseDetails AS phase_details\n"
+ + " ON documents.common_id =
phase_details.common_id\n"
+ + " LEFT JOIN Organizations AS organizations\n"
+ + " ON customer.depart_num =
organizations.org_id\n"
+ + " AND customer.common_id =
organizations.common_id\n"
+ + " LEFT JOIN Customers AS creators\n"
+ + " ON documents.creator_nm =
creators.depart_num\n"
+ + " AND documents.common_id =
creators.common_id");
+ }
+
+ @Test
+ @Tag("no-common-join-key")
+ void testComplexConditionalLogicWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductCategories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " is_premium BOOLEAN,"
+ + " discount_rate DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductReviews ("
+ + " review_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " product_id STRING,"
+ + " rating INT,"
+ + " is_verified BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "o.order_id, "
+ + "p.payment_id, "
+ + "pc.category_name, "
+ + "CASE "
+ + " WHEN pc.is_premium = true AND p.price > 1000 THEN
'High-Value Premium' "
+ + " WHEN pc.is_premium = true THEN 'Premium' "
+ + " WHEN p.price > 500 THEN 'Standard High-Value' "
+ + " ELSE 'Standard' "
+ + "END AS product_tier, "
+ + "CASE "
+ + " WHEN pr.rating >= 4 AND pr.is_verified = true
THEN 'Highly Recommended' "
+ + " WHEN pr.rating >= 3 THEN 'Recommended' "
+ + " WHEN pr.rating >= 2 THEN 'Average' "
+ + " ELSE 'Not Recommended' "
+ + "END AS recommendation_status, "
+ + "CASE "
+ + " WHEN pc.discount_rate > 0.2 THEN p.price * (1 -
pc.discount_rate) "
+ + " ELSE p.price "
+ + "END AS final_price "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "LEFT JOIN ProductCategories pc ON o.product =
pc.category_id "
+ + "LEFT JOIN ProductReviews pr ON o.product =
pr.product_id");
+ }
+
+ @Test
+ @Tag("no-common-join-key")
+ void testComplexCTEWithMultiJoin() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderStatus ("
+ + " status_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " status_name STRING,"
+ + " is_final BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE PaymentMethods ("
+ + " method_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " method_name STRING,"
+ + " processing_fee DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "WITH user_orders AS ("
+ + " SELECT u.user_id, u.name, o.order_id, o.product,
p.payment_id, p.price "
+ + " FROM Users u "
+ + " LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + " LEFT JOIN Payments p ON u.user_id = p.user_id"
+ + "), "
+ + "order_details AS ("
+ + " SELECT uo.*, os.status_name, os.is_final,
pm.method_name, pm.processing_fee "
+ + " FROM user_orders uo "
+ + " LEFT JOIN OrderStatus os ON uo.order_id =
os.status_id "
+ + " LEFT JOIN PaymentMethods pm ON uo.payment_id =
pm.method_id"
+ + "), "
+ + "final_summary AS ("
+ + " SELECT "
+ + " user_id, "
+ + " name, "
+ + " COUNT(order_id) as total_orders, "
+ + " SUM(price) as total_spent, "
+ + " AVG(price) as avg_order_value, "
+ + " COUNT(CASE WHEN is_final = true THEN 1 END) as
completed_orders "
+ + " FROM order_details "
+ + " GROUP BY user_id, name"
+ + ") "
+ + "SELECT * FROM final_summary");
+ }
+
+ @Test
+ @Tag("no-common-join-key")
Review Comment:
> I'm not sure if that's the usual use case for tags?
>
> I actually wanted to have something in the test name so that we I can read
the .xml file and immediately understand if the plan looks good.
I think it's pretty common. I thought that it would be great to run only
"multijoin-chain" tests in the future to see if the specific case is supported
now. Anyway, if you think changing name is better I will do as you suggest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]