This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c446291ccf43e1c003ba6f402df754f624f89754
Author: Daniel Vanko <[email protected]>
AuthorDate: Tue Feb 18 18:45:15 2025 +0100

    IMPALA-13074: Add sink node to Web UI's graphical plan for DDL/DML queries
    
    From now on if a plan fragment's root data sink is a table sink, a
    multi data sink or a merge sink, it will be included in the json
    response and shown on the Web UI as parent of the plan fragment.
    
    Testing
     * adopted and refined impala-http-handler-test
     * added new tests for related sink types
     * tested manually on WebUI with
       - CTAS statements
       - UPDATE statements on Iceberg tables
       - DELETE statements on Iceberg tables
       - MERGE statements on Iceberg tables
    
    Change-Id: Ib2bd442f6499efde7406d87c2b1fd1b46a45381b
    Reviewed-on: http://gerrit.cloudera.org:8080/22496
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Noemi Pap-Takacs <[email protected]>
    Reviewed-by: Zoltan Borok-Nagy <[email protected]>
---
 be/src/service/impala-http-handler-test.cc | 504 +++++++++++++++++++++++++----
 be/src/service/impala-http-handler.cc      | 170 ++++++----
 common/thrift/Query.thrift                 |   4 +-
 3 files changed, 561 insertions(+), 117 deletions(-)

diff --git a/be/src/service/impala-http-handler-test.cc 
b/be/src/service/impala-http-handler-test.cc
index 69a5738c6..4524620b7 100644
--- a/be/src/service/impala-http-handler-test.cc
+++ b/be/src/service/impala-http-handler-test.cc
@@ -85,9 +85,9 @@ class ImpalaHttpHandlerTest : public testing::Test {
     if (id % 2 == 0) {
       node_summary.__set_is_broadcast(true);
     }
-    addStatsToSummary(id, &node_summary);
+    addStatsToSummary(id + 1, &node_summary);
     if (id % 3 == 0) {
-      addStatsToSummary(id + 1, &node_summary);
+      addStatsToSummary(id + 2, &node_summary);
     }
     return node_summary;
   }
@@ -95,12 +95,12 @@ class ImpalaHttpHandlerTest : public testing::Test {
   // Should be kept synced with the logic in createTPlanNodeExecSummary.
   static void checkPlanNodeStats(const Value& plan_node, TPlanNodeId id) {
     ASSERT_TRUE(plan_node.IsObject());
-    int output_card = (id + 1) * 2;
+    int output_card = (id + 2) * 2;
     if (id % 3 == 0) {
       if (id % 2 == 0) {
-        output_card = (id + 2) * 2;
+        output_card = (id + 3) * 2;
       } else {
-        output_card += (id + 2) * 2;
+        output_card += (id + 3) * 2;
       }
     }
     EXPECT_EQ(plan_node["output_card"].GetInt(), output_card);
@@ -117,7 +117,7 @@ class ImpalaHttpHandlerTest : public testing::Test {
       EXPECT_FALSE(plan_node.HasMember("is_broadcast"));
     }
 
-    int max_time = id % 3 == 0 ? (id + 2) * 2000 : (id + 1) * 2000;
+    int max_time = id % 3 == 0 ? (id + 3) * 2000 : (id + 2) * 2000;
 
     EXPECT_EQ(plan_node["max_time_val"].GetInt(), max_time);
     EXPECT_EQ(
@@ -144,6 +144,7 @@ class ImpalaHttpHandlerTest : public testing::Test {
     TDataSink sink;
     TDataStreamSink stream_sink;
     stream_sink.__set_dest_node_id(dest_node_id);
+    sink.__set_label("EXCHANGE SENDER");
     sink.__set_type(TDataSinkType::DATA_STREAM_SINK);
     sink.__set_stream_sink(stream_sink);
     return sink;
@@ -152,6 +153,7 @@ class ImpalaHttpHandlerTest : public testing::Test {
   static TDataSink createPlanRootSink() {
     TDataSink sink;
     TPlanRootSink plan_root_sink;
+    sink.__set_label("PLAN-ROOT SINK");
     sink.__set_type(TDataSinkType::PLAN_ROOT_SINK);
     sink.__set_plan_root_sink(plan_root_sink);
     return sink;
@@ -161,13 +163,44 @@ class ImpalaHttpHandlerTest : public testing::Test {
     TDataSink sink;
     TTableSink table_sink;
     THdfsTableSink hdfs_table_sink;
+    table_sink.__set_action(TSinkAction::INSERT);
     table_sink.__set_type(TTableSinkType::HDFS);
     table_sink.__set_hdfs_table_sink(hdfs_table_sink);
+    sink.__set_label("HDFS WRITER");
     sink.__set_type(TDataSinkType::TABLE_SINK);
     sink.__set_table_sink(table_sink);
     return sink;
   }
 
+  static TDataSink createIcebergDeleteTableSink() {
+    TDataSink sink;
+    TTableSink table_sink;
+    TIcebergDeleteSink iceberg_delete_sink;
+    table_sink.__set_action(TSinkAction::DELETE);
+    table_sink.__set_type(TTableSinkType::HDFS);
+    table_sink.__set_iceberg_delete_sink(iceberg_delete_sink);
+    sink.__set_label("ICEBERG BUFFERED DELETER");
+    sink.__set_type(TDataSinkType::TABLE_SINK);
+    sink.__set_table_sink(table_sink);
+    return sink;
+  }
+
+  static TDataSink createMultiDataSink(const vector<TDataSink>& child_sinks) {
+    TDataSink sink;
+    sink.__set_type(TDataSinkType::MULTI_DATA_SINK);
+    sink.__set_label("MULTI DATA SINK");
+    sink.__set_child_data_sinks(child_sinks);
+    return sink;
+  }
+
+  static TDataSink createMergeSink(const vector<TDataSink>& child_sinks) {
+    TDataSink sink;
+    sink.__set_type(TDataSinkType::MERGE_SINK);
+    sink.__set_label("MERGE SINK");
+    sink.__set_child_data_sinks(child_sinks);
+    return sink;
+  }
+
   static void addSinkToFragment(
       const TDataSink& sink, TPlanFragment* fragment, TExecSummary* summary) {
     fragment->__set_output_sink(sink);
@@ -258,22 +291,22 @@ const string ImpalaHttpHandlerTest::PLAN_SCHEMA_JSON = 
R"({
 //     {
 //       "label": "01:EXCHANGE",
 //       "label_detail": "UNPARTITIONED",
-//       "output_card": 4,
+//       "output_card": 6,
 //       "num_instances": 2,
 //       "is_broadcast": true,
-//       "max_time": "4.000us",
-//       "max_time_val": 4000,
-//       "avg_time": "3.000us",
+//       "max_time": "6.000us",
+//       "max_time_val": 6000,
+//       "avg_time": "5.000us",
 //       "children": []
 //     },
 //     {
 //       "label": "00:SCAN HDFS",
 //       "label_detail": "default.table",
-//       "output_card": 4,
+//       "output_card": 6,
 //       "num_instances": 1,
-//       "max_time": "4.000us",
-//       "max_time_val": 4000,
-//       "avg_time": "4.000us",
+//       "max_time": "6.000us",
+//       "max_time_val": 6000,
+//       "avg_time": "6.000us",
 //       "children": [],
 //       "data_stream_target": "01:EXCHANGE"
 //     }
@@ -321,24 +354,35 @@ static void prepareSelectStatement(const vector<string>& 
labels,
 // {
 //   "plan_nodes": [
 //     {
-//       "label": "01:EXCHANGE",
-//       "label_detail": "UNPARTITIONED",
-//       "output_card": 4,
-//       "num_instances": 2,
-//       "is_broadcast": true,
-//       "max_time": "4.000us",
-//       "max_time_val": 4000,
-//       "avg_time": "3.000us",
-//       "children": []
+//       "label": "HDFS WRITER",
+//       "label_detail": "INSERT",
+//       "output_card": 2,
+//       "num_instances": 1,
+//       "max_time": "2.000us",
+//       "max_time_val": 2000,
+//       "avg_time": "2.000us",
+//       "children": [
+//         {
+//           "label": "01:EXCHANGE",
+//           "label_detail": "UNPARTITIONED",
+//           "output_card": 6,
+//           "num_instances": 2,
+//           "is_broadcast": true,
+//           "max_time": "6.000us",
+//           "max_time_val": 6000,
+//           "avg_time": "5.000us",
+//           "children": []
+//         }
+//       ]
 //     },
 //     {
 //       "label": "00:SCAN HDFS",
 //       "label_detail": "default.table",
-//       "output_card": 4,
+//       "output_card": 6,
 //       "num_instances": 1,
-//       "max_time": "4.000us",
-//       "max_time_val": 4000,
-//       "avg_time": "4.000us",
+//       "max_time": "6.000us",
+//       "max_time_val": 6000,
+//       "avg_time": "6.000us",
 //       "children": [],
 //       "data_stream_target": "01:EXCHANGE"
 //     }
@@ -370,8 +414,7 @@ static void prepareCreateTableAsSelectStatement(const 
vector<string>& labels,
   fragments->push_back(fragment00);
 }
 
-// Prepares query plan for a join statement with one fragment and the join 
node having
-// two children.
+// Prepares query plan for a join statement with three fragments.
 //
 // F02:PLAN FRAGMENT
 // |
@@ -396,42 +439,42 @@ static void prepareCreateTableAsSelectStatement(const 
vector<string>& labels,
 //     {
 //       "label": "04:EXCHANGE",
 //       "label_detail": "UNPARTITIONED",
-//       "output_card": 4,
+//       "output_card": 6,
 //       "num_instances": 2,
 //       "is_broadcast": true,
-//       "max_time": "4.000us",
-//       "max_time_val": 4000,
-//       "avg_time": "3.000us",
+//       "max_time": "6.000us",
+//       "max_time_val": 6000,
+//       "avg_time": "5.000us",
 //       "children": []
 //     },
 //     {
 //       "label": "02:HASH_JOIN",
 //       "label_detail": "INNER JOIN, BROADCAST",
-//       "output_card": 4,
+//       "output_card": 6,
 //       "num_instances": 1,
-//       "max_time": "4.000us",
-//       "max_time_val": 4000,
-//       "avg_time": "4.000us",
+//       "max_time": "6.000us",
+//       "max_time_val": 6000,
+//       "avg_time": "6.000us",
 //       "children": [
 //         {
 //           "label": "01:SCAN HDFS",
 //           "label_detail": "default.table1 t1",
-//           "output_card": 6,
+//           "output_card": 8,
 //           "num_instances": 1,
 //           "is_broadcast": true,
-//           "max_time": "6.000us",
-//           "max_time_val": 6000,
-//           "avg_time": "6.000us",
+//           "max_time": "8.000us",
+//           "max_time_val": 8000,
+//           "avg_time": "8.000us",
 //           "children": []
 //         },
 //         {
 //           "label": "03:EXCHANGE",
 //           "label_detail": "BROADCAST",
-//           "output_card": 18,
+//           "output_card": 22,
 //           "num_instances": 2,
-//           "max_time": "10.000us",
-//           "max_time_val": 10000,
-//           "avg_time": "9.000us",
+//           "max_time": "12.000us",
+//           "max_time_val": 12000,
+//           "avg_time": "11.000us",
 //           "children": []
 //         }
 //       ],
@@ -440,12 +483,12 @@ static void prepareCreateTableAsSelectStatement(const 
vector<string>& labels,
 //     {
 //       "label": "00:SCAN HDFS",
 //       "label_detail": "default.table2 t2",
-//       "output_card": 10,
+//       "output_card": 12,
 //       "num_instances": 1,
 //       "is_broadcast": true,
-//       "max_time": "10.000us",
-//       "max_time_val": 10000,
-//       "avg_time": "10.000us",
+//       "max_time": "12.000us",
+//       "max_time_val": 12000,
+//       "avg_time": "12.000us",
 //       "children": [],
 //       "data_stream_target": "03:EXCHANGE"
 //     }
@@ -494,6 +537,194 @@ static void prepareJoinStatement(const vector<string>& 
labels,
   fragments->push_back(fragment01);
 }
 
+// Prepares query plan for an Iceberg update statement. Root sink will be a 
multi data
+// sink, consisting of an HDFS table sink and an Iceberg delete table sink.
+//
+// F00:PLAN FRAGMENT
+// |
+// MULTI DATA SINK
+// |->WRITE TO HDFS
+// |
+// |->BUFFERED DELETE FROM ICEBERG
+// |
+// 00:SCAN HDFS
+//
+// Expected JSON output for the plan will look like this:
+// {
+//   "plan_nodes": [
+//     {
+//       "label": "MULTI DATA SINK",
+//       "label_detail": "HDFS WRITER, ICEBERG BUFFERED DELETER",
+//       "output_card": 2,
+//       "num_instances": 1,
+//       "max_time": "2.000us",
+//       "max_time_val": 2000,
+//       "avg_time": "2.000us",
+//       "children": [
+//         {
+//           "label": "00:SCAN HDFS",
+//           "label_detail": "default.table",
+//           "output_card": 6,
+//           "num_instances": 2,
+//           "is_broadcast": true,
+//           "max_time": "6.000us",
+//           "max_time_val": 6000,
+//           "avg_time": "5.000us",
+//           "children": []
+//         }
+//       ]
+//     }
+//   ]
+// }
+static void prepareUpdateStatement(const vector<string>& labels,
+    const vector<string>& label_details, vector<TPlanFragment>* fragments,
+    TExecSummary* summary) {
+  ASSERT_EQ(labels.size(), 1);
+  ASSERT_EQ(label_details.size(), 1);
+
+  // F00:PLAN FRAGMENT
+  TPlanFragment fragment00 = ImpalaHttpHandlerTest::createTPlanFragment(0);
+  TDataSink multi_data_sink = ImpalaHttpHandlerTest::createMultiDataSink(
+      {ImpalaHttpHandlerTest::createHdfsTableSink(),
+          ImpalaHttpHandlerTest::createIcebergDeleteTableSink()});
+  ImpalaHttpHandlerTest::addSinkToFragment(multi_data_sink, &fragment00, 
summary);
+
+  // 00:SCAN HDFS
+  ImpalaHttpHandlerTest::addNodeToFragment(
+      labels.at(0), label_details.at(0), 0, &fragment00, summary);
+  fragments->push_back(fragment00);
+}
+
+// Prepares query plan for an Iceberg merge statement. The root sink will be a
+// merge sink, consisting of an HDFS writer and an Iceberg buffered delete 
sink.
+//
+// F00:PLAN FRAGMENT
+// |
+// MERGE SINK
+// |->WRITE TO HDFS
+// |
+// |->BUFFERED DELETE FROM ICEBERG
+// |
+// 03:MERGE
+// |
+// 02:HASH JOIN
+// |
+// |--04:EXCHANGE
+// |  |
+// |  F01:PLAN FRAGMENT
+// |  01:SCAN HDFS
+// |
+// 00:SCAN HDFS [functional_parquet.target, RANDOM]
+//
+// Expected JSON output for the plan will look like this:
+// {
+//   "plan_nodes": [
+//     {
+//       "label": "MERGE SINK",
+//       "label_detail": "HDFS WRITER, ICEBERG BUFFERED DELETER",
+//       "output_card": 2,
+//       "num_instances": 1,
+//       "max_time": "2.000us",
+//       "max_time_val": 2000,
+//       "avg_time": "2.000us",
+//       "children": [
+//         {
+//           "label": "03:MERGE",
+//           "label_detail": "",
+//           "output_card": 6,
+//           "num_instances": 2,
+//           "is_broadcast": true,
+//           "max_time": "6.000us",
+//           "max_time_val": 6000,
+//           "avg_time": "5.000us",
+//           "children": [
+//             {
+//               "label": "02:HASH JOIN",
+//               "label_detail": "INNER JOIN, BROADCAST",
+//               "output_card": 6,
+//               "num_instances": 1,
+//               "max_time": "6.000us",
+//               "max_time_val": 6000,
+//               "avg_time": "6.000us",
+//               "children": [
+//                 {
+//                   "label": "04:EXCHANGE",
+//                   "label_detail": "BROADCAST",
+//                   "output_card": 8,
+//                   "num_instances": 1,
+//                   "is_broadcast": true,
+//                   "max_time": "8.000us",
+//                   "max_time_val": 8000,
+//                   "avg_time": "8.000us",
+//                   "children": []
+//                 },
+//                 {
+//                   "label": "00:SCAN HDFS",
+//                   "label_detail": "default.target",
+//                   "output_card": 22,
+//                   "num_instances": 2,
+//                   "max_time": "12.000us",
+//                   "max_time_val": 12000,
+//                   "avg_time": "11.000us",
+//                   "children": []
+//                 }
+//               ]
+//             }
+//           ]
+//         }
+//       ]
+//     },
+//     {
+//       "label": "01:SCAN HDFS",
+//       "label_detail": "default.source",
+//       "output_card": 12,
+//       "num_instances": 1,
+//       "is_broadcast": true,
+//       "max_time": "12.000us",
+//       "max_time_val": 12000,
+//       "avg_time": "12.000us",
+//       "children": [],
+//       "data_stream_target": "04:EXCHANGE"
+//     }
+//   ]
+// }
+static void prepareMergeStatement(const vector<string>& labels,
+    const vector<string>& label_details, vector<TPlanFragment>* fragments,
+    TExecSummary* summary) {
+  ASSERT_EQ(labels.size(), 5);
+  ASSERT_EQ(label_details.size(), 5);
+
+  // F00:PLAN FRAGMENT
+  TPlanFragment fragment00 = ImpalaHttpHandlerTest::createTPlanFragment(0);
+  TDataSink merge_sink = ImpalaHttpHandlerTest::createMergeSink(
+      {ImpalaHttpHandlerTest::createHdfsTableSink(),
+          ImpalaHttpHandlerTest::createIcebergDeleteTableSink()});
+  ImpalaHttpHandlerTest::addSinkToFragment(merge_sink, &fragment00, summary);
+  // 03:MERGE
+  ImpalaHttpHandlerTest::addNodeToFragment(
+      labels.at(0), label_details.at(0), 1, &fragment00, summary);
+  // 02:HASH JOIN
+  ImpalaHttpHandlerTest::addNodeToFragment(
+      labels.at(1), label_details.at(1), 2, &fragment00, summary);
+  // 04:EXCHANGE
+  ImpalaHttpHandlerTest::addNodeToFragment(
+      labels.at(2), label_details.at(2), 0, &fragment00, summary);
+  // 00:SCAN HDFS
+  ImpalaHttpHandlerTest::addNodeToFragment(
+      labels.at(3), label_details.at(3), 0, &fragment00, summary);
+  fragments->push_back(fragment00);
+
+  // F01:PLAN FRAGMENT
+  TPlanFragment fragment01 = ImpalaHttpHandlerTest::createTPlanFragment(1);
+  TDataSink stream_sink =
+      
ImpalaHttpHandlerTest::createStreamSink(fragment00.plan.nodes[2].node_id);
+  ImpalaHttpHandlerTest::addSinkToFragment(stream_sink, &fragment01, summary);
+  // 01:SCAN HDFS
+  ImpalaHttpHandlerTest::addNodeToFragment(
+      labels.at(4), label_details.at(4), 0, &fragment01, summary);
+  fragments->push_back(fragment01);
+}
+
 } // namespace impala
 
 TEST_F(ImpalaHttpHandlerTest, SelectStatement) {
@@ -512,8 +743,9 @@ TEST_F(ImpalaHttpHandlerTest, SelectStatement) {
 
   validatePlanSchema(document);
 
+  // Check the two nodes in the plan
   auto array = document["plan_nodes"].GetArray();
-  EXPECT_EQ(array.Size(), 2);
+  ASSERT_EQ(array.Size(), 2);
   for (size_t i = 0; i < array.Size(); ++i) {
     EXPECT_EQ(array[i]["label"].GetString(), SELECT_LABELS.at(i));
     EXPECT_EQ(array[i]["label_detail"].GetString(), 
SELECT_LABEL_DETAILS.at(i));
@@ -522,6 +754,10 @@ TEST_F(ImpalaHttpHandlerTest, SelectStatement) {
 
     EXPECT_EQ(array[i]["children"].Size(), 0);
   }
+
+  // The data_stream_target of 00:SCAN HDFS should point to the 01:EXCHANGE 
node
+  ASSERT_TRUE(array[1].HasMember("data_stream_target"));
+  EXPECT_EQ(array[1]["data_stream_target"].GetString(), SELECT_LABELS[0]);
 }
 
 TEST_F(ImpalaHttpHandlerTest, CreateTableAsSelectStatement) {
@@ -543,14 +779,37 @@ TEST_F(ImpalaHttpHandlerTest, 
CreateTableAsSelectStatement) {
 
   auto array = document["plan_nodes"].GetArray();
   EXPECT_EQ(array.Size(), 2);
-  for (size_t i = 0; i < array.Size(); ++i) {
-    EXPECT_EQ(array[i]["label"].GetString(), CTAS_LABELS.at(i));
-    EXPECT_EQ(array[i]["label_detail"].GetString(), CTAS_LABEL_DETAILS.at(i));
 
-    checkPlanNodeStats(array[i], i);
+  // Check the HDFS WRITER node
+  ASSERT_GE(array.Size(), 1);
+  auto& hdfs_writer_node = array[0];
+  EXPECT_STREQ(hdfs_writer_node["label"].GetString(), "HDFS WRITER");
+  EXPECT_STREQ(hdfs_writer_node["label_detail"].GetString(), "INSERT");
 
-    EXPECT_EQ(array[i]["children"].Size(), 0);
-  }
+  checkPlanNodeStats(hdfs_writer_node, -1);
+
+  // Check 01:EXCHANGE, which is the child of HDFS WRITER node
+  EXPECT_EQ(hdfs_writer_node["children"].Size(), 1);
+
+  auto& exch01_node = hdfs_writer_node["children"][0];
+  EXPECT_EQ(exch01_node["label"].GetString(), CTAS_LABELS[0]);
+  EXPECT_EQ(exch01_node["label_detail"].GetString(), CTAS_LABEL_DETAILS[0]);
+
+  checkPlanNodeStats(exch01_node, 0);
+  EXPECT_EQ(exch01_node["children"].Size(), 0);
+
+  // Check the 00:SCAN HDFS node
+  ASSERT_GE(array.Size(), 2);
+  auto& scan_node = array[1];
+  EXPECT_EQ(scan_node["label"].GetString(), CTAS_LABELS[1]);
+  EXPECT_EQ(scan_node["label_detail"].GetString(), CTAS_LABEL_DETAILS[1]);
+
+  checkPlanNodeStats(scan_node, 1);
+  EXPECT_EQ(scan_node["children"].Size(), 0);
+
+  // The data_stream_target should point to the 01:EXCHANGE node
+  ASSERT_TRUE(scan_node.HasMember("data_stream_target"));
+  EXPECT_EQ(scan_node["data_stream_target"].GetString(), CTAS_LABELS[0]);
 }
 
 TEST_F(ImpalaHttpHandlerTest, JoinStatement) {
@@ -575,6 +834,7 @@ TEST_F(ImpalaHttpHandlerTest, JoinStatement) {
   EXPECT_EQ(array.Size(), 3);
 
   // Check the 04:EXCHANGE node
+  ASSERT_GE(array.Size(), 1);
   auto& exch04_node = array[0];
   EXPECT_EQ(exch04_node["label"].GetString(), JOIN_LABELS[0]);
   EXPECT_EQ(exch04_node["label_detail"].GetString(), JOIN_LABEL_DETAILS[0]);
@@ -582,18 +842,21 @@ TEST_F(ImpalaHttpHandlerTest, JoinStatement) {
   checkPlanNodeStats(exch04_node, 0);
   EXPECT_EQ(exch04_node["children"].Size(), 0);
 
-  // Check the 02:HASH_JOIN node
+  // Check the 02:HASH JOIN node
+  ASSERT_GE(array.Size(), 2);
   auto& join_node = array[1];
   EXPECT_EQ(join_node["label"].GetString(), JOIN_LABELS[1]);
   EXPECT_EQ(join_node["label_detail"].GetString(), JOIN_LABEL_DETAILS[1]);
 
   checkPlanNodeStats(join_node, 1);
-  EXPECT_TRUE(join_node.HasMember("data_stream_target"));
+
+  // The data_stream_target of 02:HASH JOIN should point to the 04:EXCHANGE 
node
+  ASSERT_TRUE(join_node.HasMember("data_stream_target"));
   EXPECT_EQ(join_node["data_stream_target"], JOIN_LABELS[0]);
 
-  // Check the two children of join node
+  // Check the two children of 02:HASH JOIN node
   auto children = join_node["children"].GetArray();
-  EXPECT_EQ(children.Size(), 2);
+  ASSERT_EQ(children.Size(), 2);
 
   for (size_t i = 0; i < children.Size(); ++i) {
     EXPECT_EQ(children[i]["label"].GetString(), JOIN_LABELS.at(i + 2));
@@ -605,12 +868,131 @@ TEST_F(ImpalaHttpHandlerTest, JoinStatement) {
   }
 
   // Check the 00:SCAN HDFS node
+  ASSERT_GE(array.Size(), 3);
   auto& scan_node = array[2];
   EXPECT_EQ(scan_node["label"].GetString(), JOIN_LABELS[4]);
   EXPECT_EQ(scan_node["label_detail"].GetString(), JOIN_LABEL_DETAILS[4]);
 
   checkPlanNodeStats(scan_node, 4);
   EXPECT_EQ(scan_node["children"].Size(), 0);
-  EXPECT_TRUE(join_node.HasMember("data_stream_target"));
+
+  // The data_stream_target of 00:SCAN HDFS should point to the 03:EXCHANGE 
node
+  ASSERT_TRUE(join_node.HasMember("data_stream_target"));
   EXPECT_EQ(scan_node["data_stream_target"].GetString(), JOIN_LABELS[3]);
+}
+
+TEST_F(ImpalaHttpHandlerTest, UpdateStatement) {
+  const vector<string> UPDATE_LABELS = {"00:SCAN HDFS"};
+  const vector<string> UPDATE_LABEL_DETAILS = {"default.table"};
+  vector<TPlanFragment> fragments;
+  TExecSummary summary;
+
+  prepareUpdateStatement(UPDATE_LABELS, UPDATE_LABEL_DETAILS, &fragments, 
&summary);
+
+  Document document;
+  Value value(kObjectType);
+
+  PlanToJson(fragments, summary, &document, &value);
+  document.CopyFrom(value, document.GetAllocator());
+
+  validatePlanSchema(document);
+
+  auto array = document["plan_nodes"].GetArray();
+
+  // Check the MULTI DATA SINK node
+  ASSERT_EQ(array.Size(), 1);
+
+  auto& multi_data_sink_node = array[0];
+  EXPECT_STREQ(multi_data_sink_node["label"].GetString(), "MULTI DATA SINK");
+  EXPECT_STREQ(multi_data_sink_node["label_detail"].GetString(),
+      "HDFS WRITER, ICEBERG BUFFERED DELETER");
+
+  checkPlanNodeStats(multi_data_sink_node, -1);
+
+  // Check 00:SCAN HDFS node, which is the child of MULTI DATA SINK node
+  ASSERT_EQ(multi_data_sink_node["children"].Size(), 1);
+
+  auto& scan_node = multi_data_sink_node["children"][0];
+  EXPECT_EQ(scan_node["label"].GetString(), UPDATE_LABELS[0]);
+  EXPECT_EQ(scan_node["label_detail"].GetString(), UPDATE_LABEL_DETAILS[0]);
+
+  checkPlanNodeStats(scan_node, 0);
+  EXPECT_EQ(scan_node["children"].Size(), 0);
+}
+
+TEST_F(ImpalaHttpHandlerTest, MergeStatement) {
+  const vector<string> MERGE_LABELS = {
+      "03:MERGE", "02:HASH JOIN", "04:EXCHANGE", "00:SCAN HDFS", "01:SCAN 
HDFS"};
+  const vector<string> MERGE_LABEL_DETAILS = {
+      "", "INNER JOIN, BROADCAST", "BROADCAST", "default.target", 
"default.source"};
+  vector<TPlanFragment> fragments;
+  TExecSummary summary;
+
+  prepareMergeStatement(MERGE_LABELS, MERGE_LABEL_DETAILS, &fragments, 
&summary);
+
+  Document document;
+  Value value(kObjectType);
+
+  PlanToJson(fragments, summary, &document, &value);
+  document.CopyFrom(value, document.GetAllocator());
+
+  validatePlanSchema(document);
+
+  auto array = document["plan_nodes"].GetArray();
+  EXPECT_EQ(array.Size(), 2);
+
+  // Check the MERGE SINK node
+  ASSERT_GE(array.Size(), 1);
+
+  auto& merge_sink_node = array[0];
+  EXPECT_STREQ(merge_sink_node["label"].GetString(), "MERGE SINK");
+  EXPECT_STREQ(merge_sink_node["label_detail"].GetString(),
+      "HDFS WRITER, ICEBERG BUFFERED DELETER");
+
+  checkPlanNodeStats(merge_sink_node, -1);
+
+  // Check 03:MERGE node, which is the child of the MERGE SINK node
+  ASSERT_EQ(merge_sink_node["children"].Size(), 1);
+
+  auto& merge_node = merge_sink_node["children"][0];
+  EXPECT_EQ(merge_node["label"].GetString(), MERGE_LABELS[0]);
+  EXPECT_EQ(merge_node["label_detail"].GetString(), MERGE_LABEL_DETAILS[0]);
+
+  checkPlanNodeStats(merge_node, 0);
+
+  // Check the 02:HASH JOIN node, which is a child of the 03:MERGE node
+  ASSERT_EQ(merge_node["children"].Size(), 1);
+
+  auto& hash_join_node = merge_node["children"][0];
+  EXPECT_EQ(hash_join_node["label"].GetString(), MERGE_LABELS[1]);
+  EXPECT_EQ(hash_join_node["label_detail"].GetString(), 
MERGE_LABEL_DETAILS[1]);
+
+  checkPlanNodeStats(hash_join_node, 1);
+
+  // Check the two children of the 02:HASH JOIN node
+  auto children = hash_join_node["children"].GetArray();
+  ASSERT_EQ(children.Size(), 2);
+
+  for (size_t i = 0; i < children.Size(); ++i) {
+    EXPECT_EQ(children[i]["label"].GetString(), MERGE_LABELS.at(i + 2));
+    EXPECT_EQ(children[i]["label_detail"].GetString(), 
MERGE_LABEL_DETAILS.at(i + 2));
+
+    checkPlanNodeStats(children[i], i + 2);
+
+    EXPECT_EQ(children[i]["children"].Size(), 0);
+  }
+
+  // Check the 01:SCAN HDFS node
+  ASSERT_EQ(array.Size(), 2);
+
+  auto& scan_node = array[1];
+  EXPECT_EQ(scan_node["label"].GetString(), MERGE_LABELS[4]);
+  EXPECT_EQ(scan_node["label_detail"].GetString(), MERGE_LABEL_DETAILS[4]);
+
+  checkPlanNodeStats(scan_node, 4);
+  EXPECT_EQ(scan_node["children"].Size(), 0);
+
+  // The data_stream_target of 01:SCAN HDFS should point to the 04:EXCHANGE 
node
+  ASSERT_TRUE(scan_node.HasMember("data_stream_target"));
+  EXPECT_EQ(scan_node["data_stream_target"].GetString(), MERGE_LABELS[2]);
 }
\ No newline at end of file
diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index 80cabca08..849a9c716 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -1083,12 +1083,50 @@ void ImpalaHttpHandler::CatalogObjectsHandler(const 
Webserver::WebRequest& req,
 
 namespace {
 
+// Summary is stored with -1 as id if it is for a data sink at the root of a 
fragment.
+constexpr int SINK_ID = -1;
+
+void ExecStatsToJsonHelper(
+    const TPlanNodeExecSummary& summary, rapidjson::Document* document, Value* 
value) {
+  int64_t cardinality = 0;
+  int64_t max_time = 0;
+  int64_t total_time = 0;
+  for (const TExecStats& stat : summary.exec_stats) {
+    if (summary.is_broadcast) {
+      // Avoid multiple-counting for recipients of broadcasts.
+      cardinality = ::max(cardinality, stat.cardinality);
+    } else {
+      cardinality += stat.cardinality;
+    }
+    total_time += stat.latency_ns;
+    max_time = ::max(max_time, stat.latency_ns);
+  }
+  value->AddMember("output_card", cardinality, document->GetAllocator());
+  value->AddMember("num_instances", 
static_cast<uint64_t>(summary.exec_stats.size()),
+      document->GetAllocator());
+  if (summary.is_broadcast) {
+    value->AddMember("is_broadcast", true, document->GetAllocator());
+  }
+
+  const string& max_time_str = PrettyPrinter::Print(max_time, TUnit::TIME_NS);
+  Value max_time_str_json(max_time_str, document->GetAllocator());
+  value->AddMember("max_time", max_time_str_json, document->GetAllocator());
+  value->AddMember("max_time_val", max_time, document->GetAllocator());
+
+  // Round to the nearest ns, to workaround a bug in pretty-printing a 
fraction of a
+  // ns. See IMPALA-1800.
+  const string& avg_time_str = PrettyPrinter::Print(
+      // A bug may occasionally cause 1-instance nodes to appear to have 0 
instances.
+      total_time / ::max(static_cast<int>(summary.exec_stats.size()), 1), 
TUnit::TIME_NS);
+  Value avg_time_str_json(avg_time_str, document->GetAllocator());
+  value->AddMember("avg_time", avg_time_str_json, document->GetAllocator());
+}
+
 // Helper for PlanToJson(), processes a single list of plan nodes which are the
 // DFS-flattened representation of a single plan fragment. Called recursively, 
the
 // iterator parameter is updated in place so that when a recursive call 
returns, the
 // caller is pointing at the next of its children.
 void PlanToJsonHelper(const map<TPlanNodeId, TPlanNodeExecSummary>& summaries,
-    const vector<TPlanNode>& nodes,
     vector<TPlanNode>::const_iterator* it, rapidjson::Document* document, 
Value* value) {
   Value children(kArrayType);
   Value label((*it)->label, document->GetAllocator());
@@ -1098,54 +1136,67 @@ void PlanToJsonHelper(const map<TPlanNodeId, 
TPlanNodeExecSummary>& summaries,
   value->AddMember("label_detail", label_detail, document->GetAllocator());
 
   TPlanNodeId id = (*it)->node_id;
-  map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary = 
summaries.find(id);
-  if (summary != summaries.end()) {
-    int64_t cardinality = 0;
-    int64_t max_time = 0L;
-    int64_t total_time = 0;
-    for (const TExecStats& stat: summary->second.exec_stats) {
-      if (summary->second.is_broadcast) {
-        // Avoid multiple-counting for recipients of broadcasts.
-        cardinality = ::max(cardinality, stat.cardinality);
-      } else {
-        cardinality += stat.cardinality;
-      }
-      total_time += stat.latency_ns;
-      max_time = ::max(max_time, stat.latency_ns);
-    }
-    value->AddMember("output_card", cardinality, document->GetAllocator());
-    value->AddMember("num_instances",
-        static_cast<uint64_t>(summary->second.exec_stats.size()),
-        document->GetAllocator());
-    if (summary->second.is_broadcast) {
-      value->AddMember("is_broadcast", true, document->GetAllocator());
-    }
-
-    const string& max_time_str = PrettyPrinter::Print(max_time, 
TUnit::TIME_NS);
-    Value max_time_str_json(max_time_str, document->GetAllocator());
-    value->AddMember("max_time", max_time_str_json, document->GetAllocator());
-    value->AddMember("max_time_val", max_time, document->GetAllocator());
-
-    // Round to the nearest ns, to workaround a bug in pretty-printing a 
fraction of a
-    // ns. See IMPALA-1800.
-    const string& avg_time_str = PrettyPrinter::Print(
-        // A bug may occasionally cause 1-instance nodes to appear to have 0 
instances.
-        total_time / 
::max(static_cast<int>(summary->second.exec_stats.size()), 1),
-        TUnit::TIME_NS);
-    Value avg_time_str_json(avg_time_str, document->GetAllocator());
-    value->AddMember("avg_time", avg_time_str_json, document->GetAllocator());
+  map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary_it = 
summaries.find(id);
+  if (summary_it != summaries.end()) {
+    ExecStatsToJsonHelper(summary_it->second, document, value);
   }
 
   int num_children = (*it)->num_children;
   for (int i = 0; i < num_children; ++i) {
     ++(*it);
     Value container(kObjectType);
-    PlanToJsonHelper(summaries, nodes, it, document, &container);
+    PlanToJsonHelper(summaries, it, document, &container);
     children.PushBack(container, document->GetAllocator());
   }
   value->AddMember("children", children, document->GetAllocator());
 }
 
+// Helper for PlanToJson(), called only when the plan fragment's root data 
sink must be
+// one of the following types:
+//  - table sink,
+//  - multi data sink,
+//  - merge sink.
+// Plan nodes of the plan fragment will be listed as children of the root data 
sink.
+void SinkToJsonHelper(const TDataSink& sink,
+    const map<TPlanNodeId, TPlanNodeExecSummary>& summaries,
+    vector<TPlanNode>::const_iterator* it, rapidjson::Document* document, 
Value* value) {
+  Value label(sink.label, document->GetAllocator());
+  value->AddMember("label", label, document->GetAllocator());
+
+  string label_detail_str = "";
+  switch (sink.type) {
+    case TDataSinkType::type::MERGE_SINK:
+    case TDataSinkType::type::MULTI_DATA_SINK:
+      label_detail_str = sink.child_data_sinks.at(0).label;
+      for (std::size_t i = 1; i < sink.child_data_sinks.size(); ++i) {
+        label_detail_str += ", ";
+        label_detail_str += sink.child_data_sinks.at(i).label;
+      }
+      break;
+    case TDataSinkType::type::TABLE_SINK:
+      label_detail_str = to_string(sink.table_sink.action);
+      break;
+    default:
+      // Should not call SinkToJsonHelper() with any other sink type.
+      DCHECK(false) << "Invalid sink type: " << sink.type;
+  }
+  Value label_detail(label_detail_str, document->GetAllocator());
+  value->AddMember("label_detail", label_detail, document->GetAllocator());
+
+  map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary_it =
+      summaries.find(SINK_ID);
+  DCHECK(summary_it != summaries.end());
+  ExecStatsToJsonHelper(summary_it->second, document, value);
+
+  Value children(kArrayType);
+
+  Value container(kObjectType);
+  PlanToJsonHelper(summaries, it, document, &container);
+  children.PushBack(container, document->GetAllocator());
+
+  value->AddMember("children", children, document->GetAllocator());
+}
+
 } // unnamed namespace
 
 void impala::PlanToJson(const vector<TPlanFragment>& fragments,
@@ -1153,32 +1204,43 @@ void impala::PlanToJson(const vector<TPlanFragment>& 
fragments,
   // Build a map from id to label so that we can resolve the targets of 
data-stream sinks
   // and connect plan fragments.
   map<TPlanNodeId, string> label_map;
-  for (const TPlanFragment& fragment: fragments) {
-    for (const TPlanNode& node: fragment.plan.nodes) {
+  for (const TPlanFragment& fragment : fragments) {
+    for (const TPlanNode& node : fragment.plan.nodes) {
       label_map[node.node_id] = node.label;
     }
   }
 
   map<TPlanNodeId, TPlanNodeExecSummary> exec_summaries;
-  for (const TPlanNodeExecSummary& s: summary.nodes) {
-    exec_summaries[s.node_id] = s;
+  for (const TPlanNodeExecSummary& s : summary.nodes) {
+    // All sink has -1 as node_id, we want to store the summary of the first 
one (the root
+    // of the plan tree) and insert will not overwrite the existing value
+    // if the key is already present.
+    exec_summaries.insert({s.node_id, s});
   }
 
   Value nodes(kArrayType);
-  for (const TPlanFragment& fragment: fragments) {
+  for (const TPlanFragment& fragment : fragments) {
     Value plan_fragment(kObjectType);
     vector<TPlanNode>::const_iterator it = fragment.plan.nodes.begin();
-    PlanToJsonHelper(exec_summaries, fragment.plan.nodes, &it, document, 
&plan_fragment);
-    if (fragment.__isset.output_sink) {
-      const TDataSink& sink = fragment.output_sink;
-      if (sink.__isset.stream_sink) {
-        Value target(label_map[sink.stream_sink.dest_node_id],
-            document->GetAllocator());
-        plan_fragment.AddMember("data_stream_target", target, 
document->GetAllocator());
-      } else if (sink.__isset.join_build_sink) {
-        Value target(label_map[sink.join_build_sink.dest_node_id],
-            document->GetAllocator());
-        plan_fragment.AddMember("join_build_target", target, 
document->GetAllocator());
+    if (fragment.__isset.output_sink
+        && (fragment.output_sink.type == TDataSinkType::type::MERGE_SINK
+            || fragment.output_sink.type == 
TDataSinkType::type::MULTI_DATA_SINK
+            || fragment.output_sink.type == TDataSinkType::type::TABLE_SINK)) {
+      SinkToJsonHelper(
+          fragment.output_sink, exec_summaries, &it, document, &plan_fragment);
+    } else {
+      PlanToJsonHelper(exec_summaries, &it, document, &plan_fragment);
+      if (fragment.__isset.output_sink) {
+        const TDataSink& sink = fragment.output_sink;
+        if (sink.__isset.stream_sink) {
+          Value target(
+              label_map[sink.stream_sink.dest_node_id], 
document->GetAllocator());
+          plan_fragment.AddMember("data_stream_target", target, 
document->GetAllocator());
+        } else if (sink.__isset.join_build_sink) {
+          Value target(
+              label_map[sink.join_build_sink.dest_node_id], 
document->GetAllocator());
+          plan_fragment.AddMember("join_build_target", target, 
document->GetAllocator());
+        }
       }
     }
     nodes.PushBack(plan_fragment, document->GetAllocator());
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index c23a4e910..8acb92cdc 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -1049,9 +1049,9 @@ struct TFinalizeParams {
   9: optional TIcebergDmlFinalizeParams iceberg_params;
 }
 
-// Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest()
+// Result of call to ImpalaPlanService/JniFrontend.createExecRequest()
 struct TQueryExecRequest {
-  // exec info for all plans; the first one materializes the query result, and 
subsequent
+  // Exec info for all plans; the first one materializes the query result, and 
subsequent
   // plans materialize the build sides of joins. Each plan appears before its
   // dependencies in the list.
   1: optional list<TPlanExecInfo> plan_exec_info


Reply via email to