This is an automated email from the ASF dual-hosted git repository. dbecker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 3db89e6df08f2c859926bdf397e69099e21660d1 Author: jasonmfehr <[email protected]> AuthorDate: Tue Aug 6 17:12:08 2024 -0700 IMPALA-12737: Add HS2 support to the InternalServer class. The existing methods in the InternalServer class allow for retrieving query results via tab delimited text. This method is not robust when accuracy is needed for query results. This commit adds the ability to use the HS2 protocol to execute queries with the results returned as a vector of Thrift TRow objects. This new method is not used in this commit but will be used in subsequent commits for IMPALA-12737. Testing was accomplished by adding a new unit ctest. Change-Id: Idf4b278b69ffb2beef3ea5bc1eb9335d9c27a5c8 Reviewed-on: http://gerrit.cloudera.org:8080/21650 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/impala-server.h | 12 ++-- be/src/service/internal-server-test.cc | 105 +++++++++++++++++++++++++-------- be/src/service/internal-server.cc | 45 ++++++++------ be/src/service/internal-server.h | 65 +++++++++++--------- 4 files changed, 153 insertions(+), 74 deletions(-) diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 68a803e7f..8533fd9b3 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -36,6 +36,7 @@ #include "gen-cpp/ImpalaService.h" #include "gen-cpp/control_service.pb.h" #include "gen-cpp/Query_types.h" +#include "gen-cpp/TCLIService_types.h" #include "kudu/util/random.h" #include "rpc/thrift-server.h" #include "runtime/types.h" @@ -415,16 +416,19 @@ class ImpalaServer : public ImpalaServiceIf, virtual Status ExecuteIgnoreResults(const std::string& user_name, const std::string& sql, const QueryOptionMap& query_opts = {}, const bool persist_in_db = true, TUniqueId* query_id = nullptr); - virtual Status ExecuteAndFetchAllText(const std::string& user_name, - const std::string& sql, query_results& results, results_columns* columns = nullptr, - TUniqueId* query_id = nullptr); + virtual Status ExecuteAndFetchAllHS2(const std::string& user_name, + const std::string& sql, + std::vector<apache::hive::service::cli::thrift::TRow>& results, + const QueryOptionMap& query_opts = {}, const bool persist_in_db = true, + results_columns* columns = nullptr, TUniqueId* query_id = nullptr); virtual Status SubmitAndWait(const std::string& user_name, const std::string& sql, TUniqueId& new_session_id, TUniqueId& new_query_id, const QueryOptionMap& query_opts = {}, const bool persist_in_db = true); virtual Status WaitForResults(TUniqueId& query_id); virtual Status SubmitQuery(const std::string& sql, const impala::TUniqueId& session_id, TUniqueId& new_query_id, const bool persist_in_db = true); - virtual Status FetchAllRows(const TUniqueId& query_id, query_results& results, + virtual Status FetchAllRowsHS2(const TUniqueId& query_id, + std::vector<apache::hive::service::cli::thrift::TRow>& results, results_columns* columns = nullptr); virtual void CloseQuery(const TUniqueId& query_id); virtual void GetConnectionContextList( diff --git a/be/src/service/internal-server-test.cc b/be/src/service/internal-server-test.cc index ecc184283..77592c60d 100644 --- a/be/src/service/internal-server-test.cc +++ b/be/src/service/internal-server-test.cc @@ -23,6 +23,7 @@ #include "codegen/llvm-codegen.h" #include "common/status.h" #include "common/thread-debug-info.h" +#include "gen-cpp/TCLIService_types.h" #include "gtest/gtest.h" #include "gutil/strings/strcat.h" #include "gutil/walltime.h" @@ -39,6 +40,7 @@ #include "testutil/scoped-flag-setter.h" #include "util/debug-util.h" #include "util/jni-util.h" +#include "util/string-util.h" DECLARE_string(log_dir); DECLARE_string(debug_actions); @@ -59,6 +61,7 @@ DECLARE_int32(webserver_port); using namespace std; using namespace impala; using namespace rapidjson; +using namespace apache::hive::service::cli::thrift; shared_ptr<ImpalaServer> impala_server_; @@ -110,7 +113,7 @@ void assertQueryState(const TUniqueId& query_id, const string expected_state) { } // assertQueryState // Helper class to set up a uniquely named database. Not every test will need its own -// database, thus an instance of this class must be instantiated in every that that needs +// database, thus an instance of this class must be instantiated in every test that needs // its own database. // // Upon construction, instances of this class create a database consisting of the @@ -259,7 +262,7 @@ TEST(InternalServerTest, InvalidQueryOption) { // Asserts that executing multiple queries over multiple sessions against the same // internal server instance works correctly. TEST(InternalServerTest, MultipleQueriesMultipleSessions) { - query_results results = make_shared<vector<string>>(); + vector<TRow> results; InternalServer* fixture = impala_server_.get(); DatabaseTest db_test = DatabaseTest(impala_server_, "multiple_queries_multiple_sessions"); @@ -267,11 +270,11 @@ TEST(InternalServerTest, MultipleQueriesMultipleSessions) { // Set up a test table using a new session. TUniqueId query_id; - ASSERT_OK(fixture->ExecuteAndFetchAllText("impala", - StrCat("create table if not exists ", test_table_name, - "(id INT, first_name STRING, last_name STRING)"), results, nullptr, &query_id)); - ASSERT_EQ(1, results->size()); - ASSERT_EQ(results->at(0), "Table has been created."); + ASSERT_OK(fixture->ExecuteAndFetchAllHS2("impala", StrCat("create table if not exists ", + test_table_name, "(id INT, first_name STRING, last_name STRING)"), results, {}, + false, nullptr, &query_id)); + ASSERT_EQ(1, results.size()); + ASSERT_EQ(results[0].colVals[0].stringVal.value, "Table has been created."); assertQueryState(query_id, QUERY_STATE_SUCCESS); // Insert a record into the test table using a new session. @@ -281,10 +284,10 @@ TEST(InternalServerTest, MultipleQueriesMultipleSessions) { assertQueryState(query_id, QUERY_STATE_SUCCESS); // Select a record from the test table using a new session. - results->clear(); + results.clear(); results_columns columns; - ASSERT_OK(fixture->ExecuteAndFetchAllText("impala", StrCat("select id,first_name," - "last_name FROM ", test_table_name), results, &columns, &query_id)); + ASSERT_OK(fixture->ExecuteAndFetchAllHS2("impala", StrCat("select id,first_name," + "last_name FROM ", test_table_name), results, {}, false, &columns, &query_id)); assertQueryState(query_id, QUERY_STATE_SUCCESS); ASSERT_EQ(3, columns.size()); @@ -295,8 +298,10 @@ TEST(InternalServerTest, MultipleQueriesMultipleSessions) { EXPECT_EQ("last_name", columns.at(2).first); EXPECT_EQ("string", columns.at(2).second); - ASSERT_EQ(1, results->size()); - EXPECT_EQ(results->at(0), "1\ttest\tperson1"); + ASSERT_EQ(1, results.size()); + EXPECT_EQ(results[0].colVals[0].i32Val.value, 1); + EXPECT_EQ(results[0].colVals[1].stringVal.value, "test"); + EXPECT_EQ(results[0].colVals[2].stringVal.value, "person1"); } // MultipleQueriesMultipleSessions // Simulates an RPC failure which causes the coordinator to automatically retry the query. @@ -334,7 +339,7 @@ TEST(InternalServerTest, RetryFailedQuery) { // Asserts that executing multiple queries in one session against the same internal // server instance works correctly. TEST(InternalServerTest, MultipleQueriesOneSession) { - query_results results = make_shared<vector<string>>(); + vector<TRow> results; InternalServer* fixture = impala_server_.get(); DatabaseTest db_test = DatabaseTest(impala_server_, "multiple_queries_one_session"); const string test_table_name = StrCat(db_test.GetDbName(), ".test_table_1"); @@ -348,12 +353,12 @@ TEST(InternalServerTest, MultipleQueriesOneSession) { ASSERT_OK(fixture->SubmitQuery(StrCat("create table if not exists ", test_table_name, "(id INT,name STRING)"), session_id, query_id1)); ASSERT_OK(fixture->WaitForResults(query_id1)); - ASSERT_OK(fixture->FetchAllRows(query_id1, results)); + ASSERT_OK(fixture->FetchAllRowsHS2(query_id1, results)); // Assert the test table was created. - ASSERT_EQ(1, results->size()); - EXPECT_EQ(results->at(0), "Table has been created."); - results->clear(); + ASSERT_EQ(1, results.size()); + EXPECT_EQ(results[0].colVals[0].stringVal.value, "Table has been created."); + results.clear(); fixture->CloseQuery(query_id1); // In the same session, insert into the newly created test table. @@ -361,10 +366,10 @@ TEST(InternalServerTest, MultipleQueriesOneSession) { ASSERT_OK(fixture->SubmitQuery(StrCat("insert into ", test_table_name, " (id, name) VALUES (1, 'one'), (2, 'two')"), session_id, query_id2)); ASSERT_OK(fixture->WaitForResults(query_id2)); - ASSERT_OK(fixture->FetchAllRows(query_id2, results)); + ASSERT_OK(fixture->FetchAllRowsHS2(query_id2, results)); // Assert the insert succeeded. - ASSERT_EQ(0, results->size()); + ASSERT_EQ(0, results.size()); fixture->CloseQuery(query_id2); // Still in the same session, select from the test table. @@ -374,7 +379,7 @@ TEST(InternalServerTest, MultipleQueriesOneSession) { ASSERT_OK(fixture->SubmitQuery(StrCat("select name,id,name from ", test_table_name, " order by id asc"), session_id, query_id3)); ASSERT_OK(fixture->WaitForResults(query_id3)); - ASSERT_OK(fixture->FetchAllRows(query_id3, results, &columns)); + ASSERT_OK(fixture->FetchAllRowsHS2(query_id3, results, &columns)); // Assert the expected number of columns were returned from the select statement. ASSERT_EQ(3, columns.size()); @@ -386,9 +391,15 @@ TEST(InternalServerTest, MultipleQueriesOneSession) { EXPECT_EQ("string", columns.at(2).second); // Assert the expected number of rows were returned from the select statement. - ASSERT_EQ(2, results->size()); - EXPECT_EQ(results->at(0), "one\t1\tone"); - EXPECT_EQ(results->at(1), "two\t2\ttwo"); + ASSERT_EQ(2, results.size()); + EXPECT_EQ(results[0].colVals[0].stringVal.value, "one"); + EXPECT_EQ(results[0].colVals[1].i32Val.value, 1); + EXPECT_EQ(results[0].colVals[2].stringVal.value, "one"); + + EXPECT_EQ(results[1].colVals[0].stringVal.value, "two"); + EXPECT_EQ(results[1].colVals[1].i32Val.value, 2); + EXPECT_EQ(results[1].colVals[2].stringVal.value, "two"); + fixture->CloseQuery(query_id3); assertQueryState(query_id3, QUERY_STATE_SUCCESS); @@ -504,6 +515,54 @@ TEST(InternalServerTest, SimultaneousMultipleQueriesOneSession) { fixture->CloseSession(session_id); } // TEST SimultaneousMultipleQueriesOneSession +TEST(InternalServerTest, ExecuteAndFetchAllHS2) { + vector<apache::hive::service::cli::thrift::TRow> results_hs2; + InternalServer* fixture = impala_server_.get(); + DatabaseTest db_test = DatabaseTest(impala_server_, "execute_and_fetch_all_hs2"); + const string test_table_name = StrCat(db_test.GetDbName(), ".test_table_1"); + const string name_prefix = StrCat("tv_", GetCurrentTimeMicros() / 1000000, "_"); + const int num_test_rows = 10000; + + // Create a test table. + ASSERT_OK(fixture->ExecuteIgnoreResults("impala", StrCat("create table ", + test_table_name, "(id INT,name STRING)"))); + + // In the same session, insert into the newly created test table. + StringStreamPop sql; + sql << StrCat("insert into ", test_table_name, " (id,name) VALUES "); + for (int i=0; i<num_test_rows; i++) { + sql << "(" << i << ",'" << name_prefix << i << "'),"; + } + + sql.move_back(); + sql << " "; + + ASSERT_OK(fixture->ExecuteIgnoreResults("impala", sql.str())); + + // select from the test table. + TUniqueId query_id3; + results_columns columns; + + ASSERT_OK(fixture->ExecuteAndFetchAllHS2("impala", StrCat("select id, name from ", + test_table_name, " order by id asc"), results_hs2, {}, false, &columns)); + + // Assert the expected number of columns were returned from the select statement. + ASSERT_EQ(2, columns.size()); + EXPECT_EQ("id", columns.at(0).first); + EXPECT_EQ("int", columns.at(0).second); + EXPECT_EQ("name", columns.at(1).first); + EXPECT_EQ("string", columns.at(1).second); + + // Assert the expected number of rows were returned from the select statement. + ASSERT_EQ(num_test_rows, results_hs2.size()); + + for (int i=0; i<num_test_rows; i++) { + ASSERT_EQ(2, results_hs2[i].colVals.size()); + EXPECT_EQ(i, results_hs2[i].colVals[0].i32Val.value); + EXPECT_EQ(StrCat(name_prefix, i), results_hs2[i].colVals[1].stringVal.value); + } +} // TEST ExecuteAndFetchAllHS2 + } // namespace internalservertest } // namespace impala diff --git a/be/src/service/internal-server.cc b/be/src/service/internal-server.cc index 19c0ae5db..4460b6677 100644 --- a/be/src/service/internal-server.cc +++ b/be/src/service/internal-server.cc @@ -21,6 +21,7 @@ #include "common/status.h" #include "gen-cpp/ErrorCodes_types.h" #include "gen-cpp/Query_types.h" +#include "gen-cpp/TCLIService_types.h" #include "gen-cpp/Types_types.h" #include "rpc/thrift-server.h" #include "runtime/query-driver.h" @@ -30,9 +31,15 @@ #include "util/uid-util.h" using namespace std; +using apache::hive::service::cli::thrift::TProtocolVersion; +using apache::hive::service::cli::thrift::TRow; +using apache::hive::service::cli::thrift::TRowSet; namespace impala { +// equivalent to 8 row batches (assuming the default batch size) +static const int32_t ROWS_TO_FETCH = 8192; + Status ImpalaServer::OpenSession(const string& user_name, TUniqueId& new_session_id, const QueryOptionMap& query_opts) { shared_ptr<ThriftServer::ConnectionContext> conn_ctx = @@ -120,21 +127,22 @@ Status ImpalaServer::ExecuteIgnoreResults(const string& user_name, const string& return result; } //ImpalaServer::ExecuteIgnoreResults -Status ImpalaServer::ExecuteAndFetchAllText(const std::string& user_name, - const std::string& sql, query_results& results, results_columns* columns, - TUniqueId* query_id){ +Status ImpalaServer::ExecuteAndFetchAllHS2(const std::string& user_name, + const std::string& sql, vector<TRow>& results, const QueryOptionMap& query_opts, + const bool persist_in_db, results_columns* columns, TUniqueId* query_id) { TUniqueId session_id; TUniqueId internal_query_id; Status result; - result = SubmitAndWait(user_name, sql, session_id, internal_query_id); + result = SubmitAndWait(user_name, sql, session_id, internal_query_id, query_opts, + persist_in_db); if (query_id != nullptr) { *query_id = internal_query_id; } if (result.ok()) { - result = FetchAllRows(internal_query_id, results, columns); + result = FetchAllRowsHS2(internal_query_id, results, columns); } if (!UUIDEmpty(internal_query_id)) { @@ -144,7 +152,7 @@ Status ImpalaServer::ExecuteAndFetchAllText(const std::string& user_name, CloseSession(session_id); return result; -} // ImpalaServer::ExecuteAndFetchAllText +} // ImpalaServer::ExecuteAndFetchAllHS2 Status ImpalaServer::SubmitAndWait(const string& user_name, const string& sql, TUniqueId& new_session_id, TUniqueId& new_query_id, const QueryOptionMap& query_opts, @@ -208,11 +216,9 @@ Status ImpalaServer::SubmitQuery(const string& sql, const TUniqueId& session_id, return SetQueryInflight(session_state, query_handle); } // ImpalaServer::SubmitQuery -Status ImpalaServer::FetchAllRows(const TUniqueId& query_id, query_results& results, - results_columns* columns) { - QueryResultSet* result_set; +Status ImpalaServer::FetchAllRowsHS2(const TUniqueId& query_id, + vector<TRow>& query_results, results_columns* columns) { const TResultSetMetadata* results_metadata; - vector<string> row_set; QueryHandle query_handle; RETURN_IF_ERROR(GetActiveQueryHandle(query_id, &query_handle)); @@ -229,7 +235,7 @@ Status ImpalaServer::FetchAllRows(const TUniqueId& query_id, query_results& resu // populate column vector if provided by the user if (columns != nullptr) { - for (int i=0; i<results_metadata->columns.size(); i++) { + for (int i = 0; i < results_metadata->columns.size(); i++) { // TODO: As of today, the ODBC driver does not support boolean and timestamp data // type but it should. This is tracked by ODBC-189. We should verify that our // boolean and timestamp type are correctly recognized when ODBC-189 is closed. @@ -239,9 +245,6 @@ Status ImpalaServer::FetchAllRows(const TUniqueId& query_id, query_results& resu ColumnTypeToBeeswaxTypeString(type))); } } - - result_set = QueryResultSet::CreateAsciiQueryResultSet( - *results_metadata, &row_set, true); } int64_t block_wait_time = 30000000; @@ -249,14 +252,20 @@ Status ImpalaServer::FetchAllRows(const TUniqueId& query_id, query_results& resu lock_guard<mutex> l1(*query_handle->fetch_rows_lock()); lock_guard<mutex> l2(*query_handle->lock()); - row_set.clear(); + QueryResultSet* result_set; + TRowSet row_set; + + result_set = QueryResultSet::CreateHS2ResultSet( + TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1, *results_metadata, &row_set, + false, 0); - RETURN_IF_ERROR(query_handle->FetchRows(10, result_set, block_wait_time)); - results->insert(results->cend(), row_set.cbegin(), row_set.cend()); + RETURN_IF_ERROR(query_handle->FetchRows(ROWS_TO_FETCH, result_set, block_wait_time)); + query_results.insert(query_results.cend(), row_set.rows.cbegin(), + row_set.rows.cend()); } return Status::OK(); -} // ImpalaServer::FetchAllRows +} // ImpalaServer::FetchAllRowsHS2 void ImpalaServer::CloseQuery(const TUniqueId& query_id) { QueryHandle query_handle; diff --git a/be/src/service/internal-server.h b/be/src/service/internal-server.h index 65e78bdee..45dd5317b 100644 --- a/be/src/service/internal-server.h +++ b/be/src/service/internal-server.h @@ -18,18 +18,17 @@ #pragma once #include <map> -#include <memory> #include <string> #include <vector> #include "common/status.h" #include "gen-cpp/Query_types.h" +#include "gen-cpp/TCLIService_types.h" #include "gen-cpp/Types_types.h" #include "rpc/thrift-server.h" namespace impala { - typedef std::shared_ptr<std::vector<std::string>> query_results; typedef std::vector<std::pair<std::string, std::string>> results_columns; /// Enables Impala coordinators to submit queries to themselves. @@ -43,7 +42,7 @@ namespace impala { /// methods. /// /// Usage: - /// The easiest way to use this class is to call the `ExecuteAndFetchAllText` function + /// The easiest way to use this class is to call the `ExecuteAndFetchAllHS2` function /// which runs the provided sql, returns all the results, and closes the query and /// session. This function is useful for running create/insert/update queries that do /// not return many results. @@ -115,30 +114,37 @@ namespace impala { /// session. No authentication is performed. Blocks until result rows are available. /// Then, populates all result rows. Finally, cleans up the query and session. /// - /// Intended for use as a convenience method when query results are small. - /// /// Parameters: - /// `user_name` Specifies the username that will be reported as running this - /// query. - /// `sql` Text of the sql query/ddl/dml to run. - /// `results` Output parameter containing all result rows from the query. If - /// this vector has existing elements, they will be left in place with - /// result rows added at the end of the vector. - /// `columns` Optional output parameter where each element is a pair with the - /// first element being the name of the column and the second element - /// being the column type. Existing elements in the vector will be - /// left in place with column pairs appended to the end of the vector. - /// If this parameter is `nullptr`, then the list of columns is not - /// generated and this parameter's value will remain `nullptr`. - /// `query_id` Optional output parameter, if specified, it will be overwritten - /// with the id of the query that was executed. Since the query is - /// closed by this function, the query id is informational only. + /// `user_name` Specifies the username that will be reported as running this + /// query. + /// `sql` Text of the sql query/ddl/dml to run. + /// `results` Output parameter containing all result rows from the query. + /// If this vector has existing elements, they will be left in + /// place with result rows added at the end of the vector. + /// `query_opts` Optional, contains query options that will apply to all + /// queries executed by this session opened by this function. + /// `persist_in_db` Optional boolean indicating if the query data should be + /// written to the completed queries table after it is closed. + /// Defaults to `true`. + /// `columns` Optional output parameter where each element is a pair with + /// the first element being the name of the column and the second + /// element being the column type. Existing elements in the vector + /// will be left in place with column pairs appended to the end of + /// the vector. If this parameter is `nullptr`, then the list of + /// columns is not generated and this parameter's value will + /// remain `nullptr`. + /// `query_id` Optional output parameter, if specified, it will be + /// overwritten with the id of the query that was executed. Since + /// the query is closed by this function, the query id is + /// informational only. /// /// Return: /// `impala::Status` indicating the result of submitting the query and waiting for /// it to return. - virtual Status ExecuteAndFetchAllText(const std::string& user_name, - const std::string& sql, query_results& results, + virtual Status ExecuteAndFetchAllHS2(const std::string& user_name, + const std::string& sql, + std::vector<apache::hive::service::cli::thrift::TRow>& results, + const QueryOptionMap& query_opts = {}, const bool persist_in_db = true, results_columns* columns = nullptr, TUniqueId* query_id = nullptr) = 0; /// Creates a new session under the specified user and submits a query under that @@ -152,9 +158,9 @@ namespace impala { /// query. /// `sql` Text of the sql query/ddl/dml to run. /// `new_session_id` Output parameter that will be set to the id of the - /// newly created session. + /// newly created session. /// `new_query_id` Output parameter that will be set to the id of the - /// newly started query. + /// newly started query. /// `query_opts` Optional, contains query options that will apply to all /// queries executed by this session opened by this function. /// `persist_in_db` Optional boolean indicating if the query data should be @@ -193,9 +199,10 @@ namespace impala { const impala::TUniqueId& session_id, TUniqueId& new_query_id, const bool persist_in_db = true) = 0; - /// Retrieves all result rows for a given query. The query must have already been - /// submitted and one of the Wait methods called on the query to ensure results are - /// available. + /// Retrieves all result rows for a given query. The rows are retrieved using HS2 + /// objects. Thus, each row and column is stored within its own object. The query + /// must have already been submitted and one of the Wait methods called on the query + /// to ensure results are available. /// /// Note: Assumes the query represented by `query_id` was successful as this /// function does not check that the query status is a successful status. @@ -214,7 +221,8 @@ namespace impala { /// remain `nullptr`. /// Return: /// `impala::Status` Indicates the result of fetching rows. - virtual Status FetchAllRows(const TUniqueId& query_id, query_results& results, + virtual Status FetchAllRowsHS2(const TUniqueId& query_id, + std::vector<apache::hive::service::cli::thrift::TRow>& query_results, results_columns* columns = nullptr) = 0; /// Closes and cleans up the query and its associated session. @@ -231,7 +239,6 @@ namespace impala { /// server connections added to the end. virtual void GetConnectionContextList( ThriftServer::ConnectionContextList* connection_contexts) = 0; - }; // InternalServer class } // namespace impala
