snuyanzin commented on code in PR #27166:
URL: https://github.com/apache/flink/pull/27166#discussion_r2524216489
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java:
##########
@@ -684,4 +695,705 @@ void testMultiSinkOnMultiJoinedView() {
false,
false);
}
+
+ /*
+ * Calcite adds a LogicalProject to compute expressions such as UPPER and
FLOOR
+ * on the necessary fields. As a result, the planner cannot fuse all joins
into
+ * a single MultiJoin node initially.
+ */
+ @Test
+ @Tag("multijoin-chain-expected")
+ void testFourWayJoinWithFunctionInConditionMultiJoinChainExpected() {
+ util.verifyRelPlan(
+ "SELECT u.user_id, u.name, o.order_id, p.payment_id,
s.location "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON o.user_id = u.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + " AND UPPER(u.name) = UPPER(p.payment_id) "
+ + " AND (FLOOR(u.cash) >= FLOOR(p.price) OR
p.price < 0) "
+ + "LEFT JOIN Shipments s ON p.payment_id = s.location
");
+ }
+
+ /*
+ * We expect the join inputs to **not** merge into a single MultiJoin node
in this case,
+ * because `documents.common_id` is different from
`other_documents.common_id`.
+ */
+ @Test
+ @Tag("no-common-join-key")
+ void testComplexCommonJoinKeyMissingProjectionNoCommonJoinKey() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT *\n"
+ + " FROM Assignments assignments\n"
+ + " LEFT JOIN Documents AS documents\n"
+ + " ON assignments.detail_id =
documents.detail_id\n"
+ + " AND assignments.common_id =
documents.common_id\n"
+ + " LEFT JOIN Documents AS other_documents\n"
+ + " ON assignments.user_id =
other_documents.common_id\n");
+ }
+
+ @Test
+ void testComplexCommonJoinKey() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Assignments ("
+ + " assignment_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " user_id STRING,"
+ + " detail_id STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Customers ("
+ + " user_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " name STRING,"
+ + " depart_num STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Documents ("
+ + " detail_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " creator_nm STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE PhaseDetails ("
+ + " phase_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Organizations ("
+ + " org_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " org_name STRING,"
+ + " common_id STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyExecPlan(
+ "SELECT *\n"
+ + " FROM Assignments assignments\n"
+ + " LEFT JOIN Customers AS customer\n"
+ + " ON assignments.user_id = customer.user_id\n"
+ + " AND assignments.common_id =
customer.common_id\n"
+ + " LEFT JOIN Documents AS documents\n"
+ + " ON assignments.detail_id =
documents.detail_id\n"
+ + " AND assignments.common_id =
documents.common_id\n"
+ + " LEFT JOIN PhaseDetails AS phase_details\n"
+ + " ON documents.common_id =
phase_details.common_id\n"
+ + " LEFT JOIN Organizations AS organizations\n"
+ + " ON customer.depart_num =
organizations.org_id\n"
+ + " AND customer.common_id =
organizations.common_id\n"
+ + " LEFT JOIN Customers AS creators\n"
+ + " ON documents.creator_nm =
creators.depart_num\n"
+ + " AND documents.common_id =
creators.common_id");
+ }
+
+ @Test
+ @Tag("no-common-join-key")
+ void testComplexConditionalLogicWithMultiJoinNoCommonJoinKey() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductCategories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " is_premium BOOLEAN,"
+ + " discount_rate DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductReviews ("
+ + " review_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " product_id STRING,"
+ + " rating INT,"
+ + " is_verified BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "o.order_id, "
+ + "p.payment_id, "
+ + "pc.category_name, "
+ + "CASE "
+ + " WHEN pc.is_premium = true AND p.price > 1000 THEN
'High-Value Premium' "
+ + " WHEN pc.is_premium = true THEN 'Premium' "
+ + " WHEN p.price > 500 THEN 'Standard High-Value' "
+ + " ELSE 'Standard' "
+ + "END AS product_tier, "
+ + "CASE "
+ + " WHEN pr.rating >= 4 AND pr.is_verified = true
THEN 'Highly Recommended' "
+ + " WHEN pr.rating >= 3 THEN 'Recommended' "
+ + " WHEN pr.rating >= 2 THEN 'Average' "
+ + " ELSE 'Not Recommended' "
+ + "END AS recommendation_status, "
+ + "CASE "
+ + " WHEN pc.discount_rate > 0.2 THEN p.price * (1 -
pc.discount_rate) "
+ + " ELSE p.price "
+ + "END AS final_price "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "LEFT JOIN ProductCategories pc ON o.product =
pc.category_id "
+ + "LEFT JOIN ProductReviews pr ON o.product =
pr.product_id");
+ }
+
+ @Test
+ @Tag("no-common-join-key")
+ void testComplexCTEWithMultiJoinNoCommonJoinKey() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderStatus ("
+ + " status_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " status_name STRING,"
+ + " is_final BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE PaymentMethods ("
+ + " method_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " method_name STRING,"
+ + " processing_fee DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "WITH user_orders AS ("
+ + " SELECT u.user_id, u.name, o.order_id, o.product,
p.payment_id, p.price "
+ + " FROM Users u "
+ + " LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + " LEFT JOIN Payments p ON u.user_id = p.user_id"
+ + "), "
+ + "order_details AS ("
+ + " SELECT uo.*, os.status_name, os.is_final,
pm.method_name, pm.processing_fee "
+ + " FROM user_orders uo "
+ + " LEFT JOIN OrderStatus os ON uo.order_id =
os.status_id "
+ + " LEFT JOIN PaymentMethods pm ON uo.payment_id =
pm.method_id"
+ + "), "
+ + "final_summary AS ("
+ + " SELECT "
+ + " user_id, "
+ + " name, "
+ + " COUNT(order_id) as total_orders, "
+ + " SUM(price) as total_spent, "
+ + " AVG(price) as avg_order_value, "
+ + " COUNT(CASE WHEN is_final = true THEN 1 END) as
completed_orders "
+ + " FROM order_details "
+ + " GROUP BY user_id, name"
+ + ") "
+ + "SELECT * FROM final_summary");
+ }
+
+ @Test
+ @Tag("no-common-join-key")
+ void testAggregationAndGroupingWithMultiJoinNoCommonJoinKey() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE OrderItems ("
+ + " item_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " order_id STRING,"
+ + " product_name STRING,"
+ + " quantity INT,"
+ + " unit_price DOUBLE"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductCategories ("
+ + " category_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " category_name STRING,"
+ + " parent_category STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "u.name, "
+ + "pc.category_name, "
+ + "COUNT(DISTINCT o.order_id) as order_count, "
+ + "SUM(oi.quantity) as total_items, "
+ + "SUM(oi.quantity * oi.unit_price) as total_value, "
+ + "AVG(oi.unit_price) as avg_item_price, "
+ + "MAX(p.price) as max_payment, "
+ + "MIN(p.price) as min_payment, "
+ + "COUNT(CASE WHEN oi.quantity > 5 THEN 1 END) as
bulk_orders "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN OrderItems oi ON o.order_id = oi.order_id
"
+ + "LEFT JOIN ProductCategories pc ON oi.product_name =
pc.category_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "GROUP BY u.user_id, u.name, pc.category_name "
+ + "HAVING COUNT(DISTINCT o.order_id) > 0");
+ }
+
+ @Test
+ @Tag("no-common-join-key")
+ void testFunctionAndExpressionWithMultiJoinNoCommonJoinKey() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE ProductDetails ("
+ + " product_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " product_name STRING,"
+ + " description STRING,"
+ + " created_date BIGINT,"
+ + " tags STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE UserPreferences ("
+ + " user_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " preferred_category STRING,"
+ + " notification_level STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + "u.user_id, "
+ + "UPPER(u.name) as user_name_upper, "
+ + "LOWER(o.product) as product_lower, "
+ + "CONCAT(u.name, ' - ', o.product) as user_product, "
+ + "SUBSTRING(pd.description, 1, 50) as
description_preview, "
+ + "CHAR_LENGTH(pd.description) as description_length, "
+ + "FLOOR(p.price / 100.0) * 100 as price_rounded, "
+ + "CASE "
+ + " WHEN p.price > 1000 THEN 'High' "
+ + " WHEN p.price > 500 THEN 'Medium' "
+ + " ELSE 'Low' "
+ + "END as price_tier, "
+ + "REGEXP_REPLACE(pd.tags, ',', ' | ') as
formatted_tags, "
+ + "TO_TIMESTAMP_LTZ(pd.created_date, 3) as
product_created, "
+ + "COALESCE(up.preferred_category, 'None') as
user_preference, "
+ + "CASE "
+ + " WHEN up.notification_level = 'HIGH' THEN
'Frequent Updates' "
+ + " WHEN up.notification_level = 'MEDIUM' THEN 'Daily
Updates' "
+ + " ELSE 'Weekly Updates' "
+ + "END as notification_frequency "
+ + "FROM Users u "
+ + "LEFT JOIN Orders o ON u.user_id = o.user_id "
+ + "LEFT JOIN Payments p ON u.user_id = p.user_id "
+ + "LEFT JOIN ProductDetails pd ON o.product =
pd.product_id "
+ + "LEFT JOIN UserPreferences up ON u.user_id =
up.user_id");
+ }
+
+ /*
+ * Calcite automatically generates LogicalProject nodes for nested field
access.
+ * As a result, each join input in this test is wrapped in a projection,
which prevents
+ * the planner from fusing all joins into a single MultiJoin node
initially.
+ * Therefore, in this test, each Join is still converted to a MultiJoin
individually.
+ */
+ @Test
+ @Tag("multijoin-chain-expected")
+ void testJoinConditionHasNestedFieldsMultiJoinChainExpected() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Developers ("
+ + " developer_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " person ROW<info ROW<id STRING, name
STRING, region STRING>>,"
+ + " experience_years INT"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE SupportTickets ("
+ + " ticket_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " reporter ROW<info ROW<id STRING, priority
STRING>>,"
+ + " issue STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Feedback ("
+ + " feedback_id STRING PRIMARY KEY NOT
ENFORCED,"
+ + " author ROW<info ROW<id STRING, rating
INT>>,"
+ + " message STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Subscriptions ("
+ + " sub_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " subscriber ROW<info ROW<id STRING, plan
STRING>>,"
+ + " active BOOLEAN"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,UA,D')");
+
+ util.verifyRelPlan(
+ "SELECT "
+ + " d.developer_id, "
+ + " d.person.info.name AS developer_name, "
+ + " s.ticket_id, "
+ + " s.reporter.info.priority AS ticket_priority, "
+ + " f.feedback_id, "
+ + " f.author.info.rating AS feedback_rating, "
+ + " sub.sub_id, "
+ + " sub.subscriber.info.plan AS subscription_plan "
+ + "FROM Developers AS d "
+ + "LEFT JOIN SupportTickets AS s "
+ + " ON d.person.info.id = s.reporter.info.id "
+ + "LEFT JOIN Feedback AS f "
+ + " ON d.person.info.id = f.author.info.id "
+ + "LEFT JOIN Subscriptions AS sub "
+ + " ON d.person.info.id = sub.subscriber.info.id");
+ }
Review Comment:
Can you please here and in other sql insert EOLs
otherwise it is required to scroll a lot just to see the query in plan xml
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]