This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit c446291ccf43e1c003ba6f402df754f624f89754 Author: Daniel Vanko <[email protected]> AuthorDate: Tue Feb 18 18:45:15 2025 +0100 IMPALA-13074: Add sink node to Web UI's graphical plan for DDL/DML queries From now on if a plan fragment's root data sink is a table sink, a multi data sink or a merge sink, it will be included in the json response and shown on the Web UI as parent of the plan fragment. Testing * adopted and refined impala-http-handler-test * added new tests for related sink types * tested manually on WebUI with - CTAS statements - UPDATE statements on Iceberg tables - DELETE statements on Iceberg tables - MERGE statements on Iceberg tables Change-Id: Ib2bd442f6499efde7406d87c2b1fd1b46a45381b Reviewed-on: http://gerrit.cloudera.org:8080/22496 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Noemi Pap-Takacs <[email protected]> Reviewed-by: Zoltan Borok-Nagy <[email protected]> --- be/src/service/impala-http-handler-test.cc | 504 +++++++++++++++++++++++++---- be/src/service/impala-http-handler.cc | 170 ++++++---- common/thrift/Query.thrift | 4 +- 3 files changed, 561 insertions(+), 117 deletions(-) diff --git a/be/src/service/impala-http-handler-test.cc b/be/src/service/impala-http-handler-test.cc index 69a5738c6..4524620b7 100644 --- a/be/src/service/impala-http-handler-test.cc +++ b/be/src/service/impala-http-handler-test.cc @@ -85,9 +85,9 @@ class ImpalaHttpHandlerTest : public testing::Test { if (id % 2 == 0) { node_summary.__set_is_broadcast(true); } - addStatsToSummary(id, &node_summary); + addStatsToSummary(id + 1, &node_summary); if (id % 3 == 0) { - addStatsToSummary(id + 1, &node_summary); + addStatsToSummary(id + 2, &node_summary); } return node_summary; } @@ -95,12 +95,12 @@ class ImpalaHttpHandlerTest : public testing::Test { // Should be kept synced with the logic in createTPlanNodeExecSummary. static void checkPlanNodeStats(const Value& plan_node, TPlanNodeId id) { ASSERT_TRUE(plan_node.IsObject()); - int output_card = (id + 1) * 2; + int output_card = (id + 2) * 2; if (id % 3 == 0) { if (id % 2 == 0) { - output_card = (id + 2) * 2; + output_card = (id + 3) * 2; } else { - output_card += (id + 2) * 2; + output_card += (id + 3) * 2; } } EXPECT_EQ(plan_node["output_card"].GetInt(), output_card); @@ -117,7 +117,7 @@ class ImpalaHttpHandlerTest : public testing::Test { EXPECT_FALSE(plan_node.HasMember("is_broadcast")); } - int max_time = id % 3 == 0 ? (id + 2) * 2000 : (id + 1) * 2000; + int max_time = id % 3 == 0 ? (id + 3) * 2000 : (id + 2) * 2000; EXPECT_EQ(plan_node["max_time_val"].GetInt(), max_time); EXPECT_EQ( @@ -144,6 +144,7 @@ class ImpalaHttpHandlerTest : public testing::Test { TDataSink sink; TDataStreamSink stream_sink; stream_sink.__set_dest_node_id(dest_node_id); + sink.__set_label("EXCHANGE SENDER"); sink.__set_type(TDataSinkType::DATA_STREAM_SINK); sink.__set_stream_sink(stream_sink); return sink; @@ -152,6 +153,7 @@ class ImpalaHttpHandlerTest : public testing::Test { static TDataSink createPlanRootSink() { TDataSink sink; TPlanRootSink plan_root_sink; + sink.__set_label("PLAN-ROOT SINK"); sink.__set_type(TDataSinkType::PLAN_ROOT_SINK); sink.__set_plan_root_sink(plan_root_sink); return sink; @@ -161,13 +163,44 @@ class ImpalaHttpHandlerTest : public testing::Test { TDataSink sink; TTableSink table_sink; THdfsTableSink hdfs_table_sink; + table_sink.__set_action(TSinkAction::INSERT); table_sink.__set_type(TTableSinkType::HDFS); table_sink.__set_hdfs_table_sink(hdfs_table_sink); + sink.__set_label("HDFS WRITER"); sink.__set_type(TDataSinkType::TABLE_SINK); sink.__set_table_sink(table_sink); return sink; } + static TDataSink createIcebergDeleteTableSink() { + TDataSink sink; + TTableSink table_sink; + TIcebergDeleteSink iceberg_delete_sink; + table_sink.__set_action(TSinkAction::DELETE); + table_sink.__set_type(TTableSinkType::HDFS); + table_sink.__set_iceberg_delete_sink(iceberg_delete_sink); + sink.__set_label("ICEBERG BUFFERED DELETER"); + sink.__set_type(TDataSinkType::TABLE_SINK); + sink.__set_table_sink(table_sink); + return sink; + } + + static TDataSink createMultiDataSink(const vector<TDataSink>& child_sinks) { + TDataSink sink; + sink.__set_type(TDataSinkType::MULTI_DATA_SINK); + sink.__set_label("MULTI DATA SINK"); + sink.__set_child_data_sinks(child_sinks); + return sink; + } + + static TDataSink createMergeSink(const vector<TDataSink>& child_sinks) { + TDataSink sink; + sink.__set_type(TDataSinkType::MERGE_SINK); + sink.__set_label("MERGE SINK"); + sink.__set_child_data_sinks(child_sinks); + return sink; + } + static void addSinkToFragment( const TDataSink& sink, TPlanFragment* fragment, TExecSummary* summary) { fragment->__set_output_sink(sink); @@ -258,22 +291,22 @@ const string ImpalaHttpHandlerTest::PLAN_SCHEMA_JSON = R"({ // { // "label": "01:EXCHANGE", // "label_detail": "UNPARTITIONED", -// "output_card": 4, +// "output_card": 6, // "num_instances": 2, // "is_broadcast": true, -// "max_time": "4.000us", -// "max_time_val": 4000, -// "avg_time": "3.000us", +// "max_time": "6.000us", +// "max_time_val": 6000, +// "avg_time": "5.000us", // "children": [] // }, // { // "label": "00:SCAN HDFS", // "label_detail": "default.table", -// "output_card": 4, +// "output_card": 6, // "num_instances": 1, -// "max_time": "4.000us", -// "max_time_val": 4000, -// "avg_time": "4.000us", +// "max_time": "6.000us", +// "max_time_val": 6000, +// "avg_time": "6.000us", // "children": [], // "data_stream_target": "01:EXCHANGE" // } @@ -321,24 +354,35 @@ static void prepareSelectStatement(const vector<string>& labels, // { // "plan_nodes": [ // { -// "label": "01:EXCHANGE", -// "label_detail": "UNPARTITIONED", -// "output_card": 4, -// "num_instances": 2, -// "is_broadcast": true, -// "max_time": "4.000us", -// "max_time_val": 4000, -// "avg_time": "3.000us", -// "children": [] +// "label": "HDFS WRITER", +// "label_detail": "INSERT", +// "output_card": 2, +// "num_instances": 1, +// "max_time": "2.000us", +// "max_time_val": 2000, +// "avg_time": "2.000us", +// "children": [ +// { +// "label": "01:EXCHANGE", +// "label_detail": "UNPARTITIONED", +// "output_card": 6, +// "num_instances": 2, +// "is_broadcast": true, +// "max_time": "6.000us", +// "max_time_val": 6000, +// "avg_time": "5.000us", +// "children": [] +// } +// ] // }, // { // "label": "00:SCAN HDFS", // "label_detail": "default.table", -// "output_card": 4, +// "output_card": 6, // "num_instances": 1, -// "max_time": "4.000us", -// "max_time_val": 4000, -// "avg_time": "4.000us", +// "max_time": "6.000us", +// "max_time_val": 6000, +// "avg_time": "6.000us", // "children": [], // "data_stream_target": "01:EXCHANGE" // } @@ -370,8 +414,7 @@ static void prepareCreateTableAsSelectStatement(const vector<string>& labels, fragments->push_back(fragment00); } -// Prepares query plan for a join statement with one fragment and the join node having -// two children. +// Prepares query plan for a join statement with three fragments. // // F02:PLAN FRAGMENT // | @@ -396,42 +439,42 @@ static void prepareCreateTableAsSelectStatement(const vector<string>& labels, // { // "label": "04:EXCHANGE", // "label_detail": "UNPARTITIONED", -// "output_card": 4, +// "output_card": 6, // "num_instances": 2, // "is_broadcast": true, -// "max_time": "4.000us", -// "max_time_val": 4000, -// "avg_time": "3.000us", +// "max_time": "6.000us", +// "max_time_val": 6000, +// "avg_time": "5.000us", // "children": [] // }, // { // "label": "02:HASH_JOIN", // "label_detail": "INNER JOIN, BROADCAST", -// "output_card": 4, +// "output_card": 6, // "num_instances": 1, -// "max_time": "4.000us", -// "max_time_val": 4000, -// "avg_time": "4.000us", +// "max_time": "6.000us", +// "max_time_val": 6000, +// "avg_time": "6.000us", // "children": [ // { // "label": "01:SCAN HDFS", // "label_detail": "default.table1 t1", -// "output_card": 6, +// "output_card": 8, // "num_instances": 1, // "is_broadcast": true, -// "max_time": "6.000us", -// "max_time_val": 6000, -// "avg_time": "6.000us", +// "max_time": "8.000us", +// "max_time_val": 8000, +// "avg_time": "8.000us", // "children": [] // }, // { // "label": "03:EXCHANGE", // "label_detail": "BROADCAST", -// "output_card": 18, +// "output_card": 22, // "num_instances": 2, -// "max_time": "10.000us", -// "max_time_val": 10000, -// "avg_time": "9.000us", +// "max_time": "12.000us", +// "max_time_val": 12000, +// "avg_time": "11.000us", // "children": [] // } // ], @@ -440,12 +483,12 @@ static void prepareCreateTableAsSelectStatement(const vector<string>& labels, // { // "label": "00:SCAN HDFS", // "label_detail": "default.table2 t2", -// "output_card": 10, +// "output_card": 12, // "num_instances": 1, // "is_broadcast": true, -// "max_time": "10.000us", -// "max_time_val": 10000, -// "avg_time": "10.000us", +// "max_time": "12.000us", +// "max_time_val": 12000, +// "avg_time": "12.000us", // "children": [], // "data_stream_target": "03:EXCHANGE" // } @@ -494,6 +537,194 @@ static void prepareJoinStatement(const vector<string>& labels, fragments->push_back(fragment01); } +// Prepares query plan for an Iceberg update statement. Root sink will be a multi data +// sink, consisting of an HDFS table sink and an Iceberg delete table sink. +// +// F00:PLAN FRAGMENT +// | +// MULTI DATA SINK +// |->WRITE TO HDFS +// | +// |->BUFFERED DELETE FROM ICEBERG +// | +// 00:SCAN HDFS +// +// Expected JSON output for the plan will look like this: +// { +// "plan_nodes": [ +// { +// "label": "MULTI DATA SINK", +// "label_detail": "HDFS WRITER, ICEBERG BUFFERED DELETER", +// "output_card": 2, +// "num_instances": 1, +// "max_time": "2.000us", +// "max_time_val": 2000, +// "avg_time": "2.000us", +// "children": [ +// { +// "label": "00:SCAN HDFS", +// "label_detail": "default.table", +// "output_card": 6, +// "num_instances": 2, +// "is_broadcast": true, +// "max_time": "6.000us", +// "max_time_val": 6000, +// "avg_time": "5.000us", +// "children": [] +// } +// ] +// } +// ] +// } +static void prepareUpdateStatement(const vector<string>& labels, + const vector<string>& label_details, vector<TPlanFragment>* fragments, + TExecSummary* summary) { + ASSERT_EQ(labels.size(), 1); + ASSERT_EQ(label_details.size(), 1); + + // F00:PLAN FRAGMENT + TPlanFragment fragment00 = ImpalaHttpHandlerTest::createTPlanFragment(0); + TDataSink multi_data_sink = ImpalaHttpHandlerTest::createMultiDataSink( + {ImpalaHttpHandlerTest::createHdfsTableSink(), + ImpalaHttpHandlerTest::createIcebergDeleteTableSink()}); + ImpalaHttpHandlerTest::addSinkToFragment(multi_data_sink, &fragment00, summary); + + // 00:SCAN HDFS + ImpalaHttpHandlerTest::addNodeToFragment( + labels.at(0), label_details.at(0), 0, &fragment00, summary); + fragments->push_back(fragment00); +} + +// Prepares query plan for an Iceberg merge statement. The root sink will be a +// merge sink, consisting of an HDFS writer and an Iceberg buffered delete sink. +// +// F00:PLAN FRAGMENT +// | +// MERGE SINK +// |->WRITE TO HDFS +// | +// |->BUFFERED DELETE FROM ICEBERG +// | +// 03:MERGE +// | +// 02:HASH JOIN +// | +// |--04:EXCHANGE +// | | +// | F01:PLAN FRAGMENT +// | 01:SCAN HDFS +// | +// 00:SCAN HDFS [functional_parquet.target, RANDOM] +// +// Expected JSON output for the plan will look like this: +// { +// "plan_nodes": [ +// { +// "label": "MERGE SINK", +// "label_detail": "HDFS WRITER, ICEBERG BUFFERED DELETER", +// "output_card": 2, +// "num_instances": 1, +// "max_time": "2.000us", +// "max_time_val": 2000, +// "avg_time": "2.000us", +// "children": [ +// { +// "label": "03:MERGE", +// "label_detail": "", +// "output_card": 6, +// "num_instances": 2, +// "is_broadcast": true, +// "max_time": "6.000us", +// "max_time_val": 6000, +// "avg_time": "5.000us", +// "children": [ +// { +// "label": "02:HASH JOIN", +// "label_detail": "INNER JOIN, BROADCAST", +// "output_card": 6, +// "num_instances": 1, +// "max_time": "6.000us", +// "max_time_val": 6000, +// "avg_time": "6.000us", +// "children": [ +// { +// "label": "04:EXCHANGE", +// "label_detail": "BROADCAST", +// "output_card": 8, +// "num_instances": 1, +// "is_broadcast": true, +// "max_time": "8.000us", +// "max_time_val": 8000, +// "avg_time": "8.000us", +// "children": [] +// }, +// { +// "label": "00:SCAN HDFS", +// "label_detail": "default.target", +// "output_card": 22, +// "num_instances": 2, +// "max_time": "12.000us", +// "max_time_val": 12000, +// "avg_time": "11.000us", +// "children": [] +// } +// ] +// } +// ] +// } +// ] +// }, +// { +// "label": "01:SCAN HDFS", +// "label_detail": "default.source", +// "output_card": 12, +// "num_instances": 1, +// "is_broadcast": true, +// "max_time": "12.000us", +// "max_time_val": 12000, +// "avg_time": "12.000us", +// "children": [], +// "data_stream_target": "04:EXCHANGE" +// } +// ] +// } +static void prepareMergeStatement(const vector<string>& labels, + const vector<string>& label_details, vector<TPlanFragment>* fragments, + TExecSummary* summary) { + ASSERT_EQ(labels.size(), 5); + ASSERT_EQ(label_details.size(), 5); + + // F00:PLAN FRAGMENT + TPlanFragment fragment00 = ImpalaHttpHandlerTest::createTPlanFragment(0); + TDataSink merge_sink = ImpalaHttpHandlerTest::createMergeSink( + {ImpalaHttpHandlerTest::createHdfsTableSink(), + ImpalaHttpHandlerTest::createIcebergDeleteTableSink()}); + ImpalaHttpHandlerTest::addSinkToFragment(merge_sink, &fragment00, summary); + // 03:MERGE + ImpalaHttpHandlerTest::addNodeToFragment( + labels.at(0), label_details.at(0), 1, &fragment00, summary); + // 02:HASH JOIN + ImpalaHttpHandlerTest::addNodeToFragment( + labels.at(1), label_details.at(1), 2, &fragment00, summary); + // 04:EXCHANGE + ImpalaHttpHandlerTest::addNodeToFragment( + labels.at(2), label_details.at(2), 0, &fragment00, summary); + // 00:SCAN HDFS + ImpalaHttpHandlerTest::addNodeToFragment( + labels.at(3), label_details.at(3), 0, &fragment00, summary); + fragments->push_back(fragment00); + + // F01:PLAN FRAGMENT + TPlanFragment fragment01 = ImpalaHttpHandlerTest::createTPlanFragment(1); + TDataSink stream_sink = + ImpalaHttpHandlerTest::createStreamSink(fragment00.plan.nodes[2].node_id); + ImpalaHttpHandlerTest::addSinkToFragment(stream_sink, &fragment01, summary); + // 01:SCAN HDFS + ImpalaHttpHandlerTest::addNodeToFragment( + labels.at(4), label_details.at(4), 0, &fragment01, summary); + fragments->push_back(fragment01); +} + } // namespace impala TEST_F(ImpalaHttpHandlerTest, SelectStatement) { @@ -512,8 +743,9 @@ TEST_F(ImpalaHttpHandlerTest, SelectStatement) { validatePlanSchema(document); + // Check the two nodes in the plan auto array = document["plan_nodes"].GetArray(); - EXPECT_EQ(array.Size(), 2); + ASSERT_EQ(array.Size(), 2); for (size_t i = 0; i < array.Size(); ++i) { EXPECT_EQ(array[i]["label"].GetString(), SELECT_LABELS.at(i)); EXPECT_EQ(array[i]["label_detail"].GetString(), SELECT_LABEL_DETAILS.at(i)); @@ -522,6 +754,10 @@ TEST_F(ImpalaHttpHandlerTest, SelectStatement) { EXPECT_EQ(array[i]["children"].Size(), 0); } + + // The data_stream_target of 00:SCAN HDFS should point to the 01:EXCHANGE node + ASSERT_TRUE(array[1].HasMember("data_stream_target")); + EXPECT_EQ(array[1]["data_stream_target"].GetString(), SELECT_LABELS[0]); } TEST_F(ImpalaHttpHandlerTest, CreateTableAsSelectStatement) { @@ -543,14 +779,37 @@ TEST_F(ImpalaHttpHandlerTest, CreateTableAsSelectStatement) { auto array = document["plan_nodes"].GetArray(); EXPECT_EQ(array.Size(), 2); - for (size_t i = 0; i < array.Size(); ++i) { - EXPECT_EQ(array[i]["label"].GetString(), CTAS_LABELS.at(i)); - EXPECT_EQ(array[i]["label_detail"].GetString(), CTAS_LABEL_DETAILS.at(i)); - checkPlanNodeStats(array[i], i); + // Check the HDFS WRITER node + ASSERT_GE(array.Size(), 1); + auto& hdfs_writer_node = array[0]; + EXPECT_STREQ(hdfs_writer_node["label"].GetString(), "HDFS WRITER"); + EXPECT_STREQ(hdfs_writer_node["label_detail"].GetString(), "INSERT"); - EXPECT_EQ(array[i]["children"].Size(), 0); - } + checkPlanNodeStats(hdfs_writer_node, -1); + + // Check 01:EXCHANGE, which is the child of HDFS WRITER node + EXPECT_EQ(hdfs_writer_node["children"].Size(), 1); + + auto& exch01_node = hdfs_writer_node["children"][0]; + EXPECT_EQ(exch01_node["label"].GetString(), CTAS_LABELS[0]); + EXPECT_EQ(exch01_node["label_detail"].GetString(), CTAS_LABEL_DETAILS[0]); + + checkPlanNodeStats(exch01_node, 0); + EXPECT_EQ(exch01_node["children"].Size(), 0); + + // Check the 00:SCAN HDFS node + ASSERT_GE(array.Size(), 2); + auto& scan_node = array[1]; + EXPECT_EQ(scan_node["label"].GetString(), CTAS_LABELS[1]); + EXPECT_EQ(scan_node["label_detail"].GetString(), CTAS_LABEL_DETAILS[1]); + + checkPlanNodeStats(scan_node, 1); + EXPECT_EQ(scan_node["children"].Size(), 0); + + // The data_stream_target should point to the 01:EXCHANGE node + ASSERT_TRUE(scan_node.HasMember("data_stream_target")); + EXPECT_EQ(scan_node["data_stream_target"].GetString(), CTAS_LABELS[0]); } TEST_F(ImpalaHttpHandlerTest, JoinStatement) { @@ -575,6 +834,7 @@ TEST_F(ImpalaHttpHandlerTest, JoinStatement) { EXPECT_EQ(array.Size(), 3); // Check the 04:EXCHANGE node + ASSERT_GE(array.Size(), 1); auto& exch04_node = array[0]; EXPECT_EQ(exch04_node["label"].GetString(), JOIN_LABELS[0]); EXPECT_EQ(exch04_node["label_detail"].GetString(), JOIN_LABEL_DETAILS[0]); @@ -582,18 +842,21 @@ TEST_F(ImpalaHttpHandlerTest, JoinStatement) { checkPlanNodeStats(exch04_node, 0); EXPECT_EQ(exch04_node["children"].Size(), 0); - // Check the 02:HASH_JOIN node + // Check the 02:HASH JOIN node + ASSERT_GE(array.Size(), 2); auto& join_node = array[1]; EXPECT_EQ(join_node["label"].GetString(), JOIN_LABELS[1]); EXPECT_EQ(join_node["label_detail"].GetString(), JOIN_LABEL_DETAILS[1]); checkPlanNodeStats(join_node, 1); - EXPECT_TRUE(join_node.HasMember("data_stream_target")); + + // The data_stream_target of 02:HASH JOIN should point to the 04:EXCHANGE node + ASSERT_TRUE(join_node.HasMember("data_stream_target")); EXPECT_EQ(join_node["data_stream_target"], JOIN_LABELS[0]); - // Check the two children of join node + // Check the two children of 02:HASH JOIN node auto children = join_node["children"].GetArray(); - EXPECT_EQ(children.Size(), 2); + ASSERT_EQ(children.Size(), 2); for (size_t i = 0; i < children.Size(); ++i) { EXPECT_EQ(children[i]["label"].GetString(), JOIN_LABELS.at(i + 2)); @@ -605,12 +868,131 @@ TEST_F(ImpalaHttpHandlerTest, JoinStatement) { } // Check the 00:SCAN HDFS node + ASSERT_GE(array.Size(), 3); auto& scan_node = array[2]; EXPECT_EQ(scan_node["label"].GetString(), JOIN_LABELS[4]); EXPECT_EQ(scan_node["label_detail"].GetString(), JOIN_LABEL_DETAILS[4]); checkPlanNodeStats(scan_node, 4); EXPECT_EQ(scan_node["children"].Size(), 0); - EXPECT_TRUE(join_node.HasMember("data_stream_target")); + + // The data_stream_target of 00:SCAN HDFS should point to the 03:EXCHANGE node + ASSERT_TRUE(join_node.HasMember("data_stream_target")); EXPECT_EQ(scan_node["data_stream_target"].GetString(), JOIN_LABELS[3]); +} + +TEST_F(ImpalaHttpHandlerTest, UpdateStatement) { + const vector<string> UPDATE_LABELS = {"00:SCAN HDFS"}; + const vector<string> UPDATE_LABEL_DETAILS = {"default.table"}; + vector<TPlanFragment> fragments; + TExecSummary summary; + + prepareUpdateStatement(UPDATE_LABELS, UPDATE_LABEL_DETAILS, &fragments, &summary); + + Document document; + Value value(kObjectType); + + PlanToJson(fragments, summary, &document, &value); + document.CopyFrom(value, document.GetAllocator()); + + validatePlanSchema(document); + + auto array = document["plan_nodes"].GetArray(); + + // Check the MULTI DATA SINK node + ASSERT_EQ(array.Size(), 1); + + auto& multi_data_sink_node = array[0]; + EXPECT_STREQ(multi_data_sink_node["label"].GetString(), "MULTI DATA SINK"); + EXPECT_STREQ(multi_data_sink_node["label_detail"].GetString(), + "HDFS WRITER, ICEBERG BUFFERED DELETER"); + + checkPlanNodeStats(multi_data_sink_node, -1); + + // Check 00:SCAN HDFS node, which is the child of MULTI DATA SINK node + ASSERT_EQ(multi_data_sink_node["children"].Size(), 1); + + auto& scan_node = multi_data_sink_node["children"][0]; + EXPECT_EQ(scan_node["label"].GetString(), UPDATE_LABELS[0]); + EXPECT_EQ(scan_node["label_detail"].GetString(), UPDATE_LABEL_DETAILS[0]); + + checkPlanNodeStats(scan_node, 0); + EXPECT_EQ(scan_node["children"].Size(), 0); +} + +TEST_F(ImpalaHttpHandlerTest, MergeStatement) { + const vector<string> MERGE_LABELS = { + "03:MERGE", "02:HASH JOIN", "04:EXCHANGE", "00:SCAN HDFS", "01:SCAN HDFS"}; + const vector<string> MERGE_LABEL_DETAILS = { + "", "INNER JOIN, BROADCAST", "BROADCAST", "default.target", "default.source"}; + vector<TPlanFragment> fragments; + TExecSummary summary; + + prepareMergeStatement(MERGE_LABELS, MERGE_LABEL_DETAILS, &fragments, &summary); + + Document document; + Value value(kObjectType); + + PlanToJson(fragments, summary, &document, &value); + document.CopyFrom(value, document.GetAllocator()); + + validatePlanSchema(document); + + auto array = document["plan_nodes"].GetArray(); + EXPECT_EQ(array.Size(), 2); + + // Check the MERGE SINK node + ASSERT_GE(array.Size(), 1); + + auto& merge_sink_node = array[0]; + EXPECT_STREQ(merge_sink_node["label"].GetString(), "MERGE SINK"); + EXPECT_STREQ(merge_sink_node["label_detail"].GetString(), + "HDFS WRITER, ICEBERG BUFFERED DELETER"); + + checkPlanNodeStats(merge_sink_node, -1); + + // Check 03:MERGE node, which is the child of the MERGE SINK node + ASSERT_EQ(merge_sink_node["children"].Size(), 1); + + auto& merge_node = merge_sink_node["children"][0]; + EXPECT_EQ(merge_node["label"].GetString(), MERGE_LABELS[0]); + EXPECT_EQ(merge_node["label_detail"].GetString(), MERGE_LABEL_DETAILS[0]); + + checkPlanNodeStats(merge_node, 0); + + // Check the 02:HASH JOIN node, which is a child of the 03:MERGE node + ASSERT_EQ(merge_node["children"].Size(), 1); + + auto& hash_join_node = merge_node["children"][0]; + EXPECT_EQ(hash_join_node["label"].GetString(), MERGE_LABELS[1]); + EXPECT_EQ(hash_join_node["label_detail"].GetString(), MERGE_LABEL_DETAILS[1]); + + checkPlanNodeStats(hash_join_node, 1); + + // Check the two children of the 02:HASH JOIN node + auto children = hash_join_node["children"].GetArray(); + ASSERT_EQ(children.Size(), 2); + + for (size_t i = 0; i < children.Size(); ++i) { + EXPECT_EQ(children[i]["label"].GetString(), MERGE_LABELS.at(i + 2)); + EXPECT_EQ(children[i]["label_detail"].GetString(), MERGE_LABEL_DETAILS.at(i + 2)); + + checkPlanNodeStats(children[i], i + 2); + + EXPECT_EQ(children[i]["children"].Size(), 0); + } + + // Check the 01:SCAN HDFS node + ASSERT_EQ(array.Size(), 2); + + auto& scan_node = array[1]; + EXPECT_EQ(scan_node["label"].GetString(), MERGE_LABELS[4]); + EXPECT_EQ(scan_node["label_detail"].GetString(), MERGE_LABEL_DETAILS[4]); + + checkPlanNodeStats(scan_node, 4); + EXPECT_EQ(scan_node["children"].Size(), 0); + + // The data_stream_target of 01:SCAN HDFS should point to the 04:EXCHANGE node + ASSERT_TRUE(scan_node.HasMember("data_stream_target")); + EXPECT_EQ(scan_node["data_stream_target"].GetString(), MERGE_LABELS[2]); } \ No newline at end of file diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index 80cabca08..849a9c716 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -1083,12 +1083,50 @@ void ImpalaHttpHandler::CatalogObjectsHandler(const Webserver::WebRequest& req, namespace { +// Summary is stored with -1 as id if it is for a data sink at the root of a fragment. +constexpr int SINK_ID = -1; + +void ExecStatsToJsonHelper( + const TPlanNodeExecSummary& summary, rapidjson::Document* document, Value* value) { + int64_t cardinality = 0; + int64_t max_time = 0; + int64_t total_time = 0; + for (const TExecStats& stat : summary.exec_stats) { + if (summary.is_broadcast) { + // Avoid multiple-counting for recipients of broadcasts. + cardinality = ::max(cardinality, stat.cardinality); + } else { + cardinality += stat.cardinality; + } + total_time += stat.latency_ns; + max_time = ::max(max_time, stat.latency_ns); + } + value->AddMember("output_card", cardinality, document->GetAllocator()); + value->AddMember("num_instances", static_cast<uint64_t>(summary.exec_stats.size()), + document->GetAllocator()); + if (summary.is_broadcast) { + value->AddMember("is_broadcast", true, document->GetAllocator()); + } + + const string& max_time_str = PrettyPrinter::Print(max_time, TUnit::TIME_NS); + Value max_time_str_json(max_time_str, document->GetAllocator()); + value->AddMember("max_time", max_time_str_json, document->GetAllocator()); + value->AddMember("max_time_val", max_time, document->GetAllocator()); + + // Round to the nearest ns, to workaround a bug in pretty-printing a fraction of a + // ns. See IMPALA-1800. + const string& avg_time_str = PrettyPrinter::Print( + // A bug may occasionally cause 1-instance nodes to appear to have 0 instances. + total_time / ::max(static_cast<int>(summary.exec_stats.size()), 1), TUnit::TIME_NS); + Value avg_time_str_json(avg_time_str, document->GetAllocator()); + value->AddMember("avg_time", avg_time_str_json, document->GetAllocator()); +} + // Helper for PlanToJson(), processes a single list of plan nodes which are the // DFS-flattened representation of a single plan fragment. Called recursively, the // iterator parameter is updated in place so that when a recursive call returns, the // caller is pointing at the next of its children. void PlanToJsonHelper(const map<TPlanNodeId, TPlanNodeExecSummary>& summaries, - const vector<TPlanNode>& nodes, vector<TPlanNode>::const_iterator* it, rapidjson::Document* document, Value* value) { Value children(kArrayType); Value label((*it)->label, document->GetAllocator()); @@ -1098,54 +1136,67 @@ void PlanToJsonHelper(const map<TPlanNodeId, TPlanNodeExecSummary>& summaries, value->AddMember("label_detail", label_detail, document->GetAllocator()); TPlanNodeId id = (*it)->node_id; - map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary = summaries.find(id); - if (summary != summaries.end()) { - int64_t cardinality = 0; - int64_t max_time = 0L; - int64_t total_time = 0; - for (const TExecStats& stat: summary->second.exec_stats) { - if (summary->second.is_broadcast) { - // Avoid multiple-counting for recipients of broadcasts. - cardinality = ::max(cardinality, stat.cardinality); - } else { - cardinality += stat.cardinality; - } - total_time += stat.latency_ns; - max_time = ::max(max_time, stat.latency_ns); - } - value->AddMember("output_card", cardinality, document->GetAllocator()); - value->AddMember("num_instances", - static_cast<uint64_t>(summary->second.exec_stats.size()), - document->GetAllocator()); - if (summary->second.is_broadcast) { - value->AddMember("is_broadcast", true, document->GetAllocator()); - } - - const string& max_time_str = PrettyPrinter::Print(max_time, TUnit::TIME_NS); - Value max_time_str_json(max_time_str, document->GetAllocator()); - value->AddMember("max_time", max_time_str_json, document->GetAllocator()); - value->AddMember("max_time_val", max_time, document->GetAllocator()); - - // Round to the nearest ns, to workaround a bug in pretty-printing a fraction of a - // ns. See IMPALA-1800. - const string& avg_time_str = PrettyPrinter::Print( - // A bug may occasionally cause 1-instance nodes to appear to have 0 instances. - total_time / ::max(static_cast<int>(summary->second.exec_stats.size()), 1), - TUnit::TIME_NS); - Value avg_time_str_json(avg_time_str, document->GetAllocator()); - value->AddMember("avg_time", avg_time_str_json, document->GetAllocator()); + map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary_it = summaries.find(id); + if (summary_it != summaries.end()) { + ExecStatsToJsonHelper(summary_it->second, document, value); } int num_children = (*it)->num_children; for (int i = 0; i < num_children; ++i) { ++(*it); Value container(kObjectType); - PlanToJsonHelper(summaries, nodes, it, document, &container); + PlanToJsonHelper(summaries, it, document, &container); children.PushBack(container, document->GetAllocator()); } value->AddMember("children", children, document->GetAllocator()); } +// Helper for PlanToJson(), called only when the plan fragment's root data sink must be +// one of the following types: +// - table sink, +// - multi data sink, +// - merge sink. +// Plan nodes of the plan fragment will be listed as children of the root data sink. +void SinkToJsonHelper(const TDataSink& sink, + const map<TPlanNodeId, TPlanNodeExecSummary>& summaries, + vector<TPlanNode>::const_iterator* it, rapidjson::Document* document, Value* value) { + Value label(sink.label, document->GetAllocator()); + value->AddMember("label", label, document->GetAllocator()); + + string label_detail_str = ""; + switch (sink.type) { + case TDataSinkType::type::MERGE_SINK: + case TDataSinkType::type::MULTI_DATA_SINK: + label_detail_str = sink.child_data_sinks.at(0).label; + for (std::size_t i = 1; i < sink.child_data_sinks.size(); ++i) { + label_detail_str += ", "; + label_detail_str += sink.child_data_sinks.at(i).label; + } + break; + case TDataSinkType::type::TABLE_SINK: + label_detail_str = to_string(sink.table_sink.action); + break; + default: + // Should not call SinkToJsonHelper() with any other sink type. + DCHECK(false) << "Invalid sink type: " << sink.type; + } + Value label_detail(label_detail_str, document->GetAllocator()); + value->AddMember("label_detail", label_detail, document->GetAllocator()); + + map<TPlanNodeId, TPlanNodeExecSummary>::const_iterator summary_it = + summaries.find(SINK_ID); + DCHECK(summary_it != summaries.end()); + ExecStatsToJsonHelper(summary_it->second, document, value); + + Value children(kArrayType); + + Value container(kObjectType); + PlanToJsonHelper(summaries, it, document, &container); + children.PushBack(container, document->GetAllocator()); + + value->AddMember("children", children, document->GetAllocator()); +} + } // unnamed namespace void impala::PlanToJson(const vector<TPlanFragment>& fragments, @@ -1153,32 +1204,43 @@ void impala::PlanToJson(const vector<TPlanFragment>& fragments, // Build a map from id to label so that we can resolve the targets of data-stream sinks // and connect plan fragments. map<TPlanNodeId, string> label_map; - for (const TPlanFragment& fragment: fragments) { - for (const TPlanNode& node: fragment.plan.nodes) { + for (const TPlanFragment& fragment : fragments) { + for (const TPlanNode& node : fragment.plan.nodes) { label_map[node.node_id] = node.label; } } map<TPlanNodeId, TPlanNodeExecSummary> exec_summaries; - for (const TPlanNodeExecSummary& s: summary.nodes) { - exec_summaries[s.node_id] = s; + for (const TPlanNodeExecSummary& s : summary.nodes) { + // All sink has -1 as node_id, we want to store the summary of the first one (the root + // of the plan tree) and insert will not overwrite the existing value + // if the key is already present. + exec_summaries.insert({s.node_id, s}); } Value nodes(kArrayType); - for (const TPlanFragment& fragment: fragments) { + for (const TPlanFragment& fragment : fragments) { Value plan_fragment(kObjectType); vector<TPlanNode>::const_iterator it = fragment.plan.nodes.begin(); - PlanToJsonHelper(exec_summaries, fragment.plan.nodes, &it, document, &plan_fragment); - if (fragment.__isset.output_sink) { - const TDataSink& sink = fragment.output_sink; - if (sink.__isset.stream_sink) { - Value target(label_map[sink.stream_sink.dest_node_id], - document->GetAllocator()); - plan_fragment.AddMember("data_stream_target", target, document->GetAllocator()); - } else if (sink.__isset.join_build_sink) { - Value target(label_map[sink.join_build_sink.dest_node_id], - document->GetAllocator()); - plan_fragment.AddMember("join_build_target", target, document->GetAllocator()); + if (fragment.__isset.output_sink + && (fragment.output_sink.type == TDataSinkType::type::MERGE_SINK + || fragment.output_sink.type == TDataSinkType::type::MULTI_DATA_SINK + || fragment.output_sink.type == TDataSinkType::type::TABLE_SINK)) { + SinkToJsonHelper( + fragment.output_sink, exec_summaries, &it, document, &plan_fragment); + } else { + PlanToJsonHelper(exec_summaries, &it, document, &plan_fragment); + if (fragment.__isset.output_sink) { + const TDataSink& sink = fragment.output_sink; + if (sink.__isset.stream_sink) { + Value target( + label_map[sink.stream_sink.dest_node_id], document->GetAllocator()); + plan_fragment.AddMember("data_stream_target", target, document->GetAllocator()); + } else if (sink.__isset.join_build_sink) { + Value target( + label_map[sink.join_build_sink.dest_node_id], document->GetAllocator()); + plan_fragment.AddMember("join_build_target", target, document->GetAllocator()); + } } } nodes.PushBack(plan_fragment, document->GetAllocator()); diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index c23a4e910..8acb92cdc 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -1049,9 +1049,9 @@ struct TFinalizeParams { 9: optional TIcebergDmlFinalizeParams iceberg_params; } -// Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest() +// Result of call to ImpalaPlanService/JniFrontend.createExecRequest() struct TQueryExecRequest { - // exec info for all plans; the first one materializes the query result, and subsequent + // Exec info for all plans; the first one materializes the query result, and subsequent // plans materialize the build sides of joins. Each plan appears before its // dependencies in the list. 1: optional list<TPlanExecInfo> plan_exec_info
