This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4eafaf87b5f129dc4fbe9f285599492122e12e1c Author: Michael Smith <[email protected]> AuthorDate: Wed Sep 11 13:59:28 2024 -0700 IMPALA-13322: Fix alter on SystemTables Fixes SystemTable.load to handle incremental loads - such as during alter - and correctly update the metastore Table so subsequent operations see the correct version of the table schema. Updates QueryScanner to allow unknown columns and log a warning. This is primarily needed when another coordinator has been upgraded and modified table metadata with new columns that this coordinator doesn't recognize. Adds TestQueryLive::test_alter to test that altering the table works and the table is usable afterwards, including attempting to query unknown columns as we might encounter them during upgrades. Change-Id: I9a59e58c086e659941e0db8a2b893ac6dcc5143a Reviewed-on: http://gerrit.cloudera.org:8080/21792 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/system-table-scanner.cc | 12 ++++-- be/src/exec/system-table-scanner.h | 10 +++-- .../org/apache/impala/catalog/SystemTable.java | 28 +++++++++---- tests/custom_cluster/test_query_live.py | 46 ++++++++++++++++++++-- 4 files changed, 78 insertions(+), 18 deletions(-) diff --git a/be/src/exec/system-table-scanner.cc b/be/src/exec/system-table-scanner.cc index 417a839d7..c4af35f42 100644 --- a/be/src/exec/system-table-scanner.cc +++ b/be/src/exec/system-table-scanner.cc @@ -55,7 +55,7 @@ Status SystemTableScanner::CreateScanner(RuntimeState* state, RuntimeProfile* pr TSystemTableName::type table_name, std::unique_ptr<SystemTableScanner>* scanner) { switch (table_name) { case TSystemTableName::IMPALA_QUERY_LIVE: - *scanner = make_unique<QueryScanner>(state, profile); + *scanner = make_unique<QueryScanner>(state, profile, table_name); break; default: return Status(ErrorMsg(TErrorCode::NOT_IMPLEMENTED_ERROR, @@ -112,8 +112,9 @@ static void WriteDecimalSlot( DCHECK(!overflow); } -QueryScanner::QueryScanner(RuntimeState* state, RuntimeProfile* profile) - : SystemTableScanner(state, profile), +QueryScanner::QueryScanner(RuntimeState* state, RuntimeProfile* profile, + TSystemTableName::type table_name) + : SystemTableScanner(state, profile, table_name), active_query_collection_timer_(ADD_TIMER(profile_, "ActiveQueryCollectionTime")), pending_query_collection_timer_(ADD_TIMER(profile_, "PendingQueryCollectionTime")) {} @@ -370,7 +371,10 @@ Status QueryScanner::MaterializeNextTuple( } break; default: - DCHECK(false) << "Unknown column position " << slot_desc->col_pos(); + LOG(WARNING) << "Unknown column (position " << slot_desc->col_pos() << ") added" + " to table " << table_name_ << "; check if a coordinator was upgraded"; + tuple->SetNull(slot_desc->null_indicator_offset()); + break; } } diff --git a/be/src/exec/system-table-scanner.h b/be/src/exec/system-table-scanner.h index 92fbd6a78..9afacde40 100644 --- a/be/src/exec/system-table-scanner.h +++ b/be/src/exec/system-table-scanner.h @@ -42,8 +42,9 @@ class SystemTableScanner { bool eos() const noexcept { return eos_; } protected: - SystemTableScanner(RuntimeState* state, RuntimeProfile* profile) - : state_(state), profile_(profile), eos_(false) {} + SystemTableScanner(RuntimeState* state, RuntimeProfile* profile, + TSystemTableName::type table_name) + : state_(state), profile_(profile), table_name_(table_name), eos_(false) {} /// Write a string value to a STRING slot, allocating memory from 'pool'. Returns /// an error if memory cannot be allocated without exceeding a memory limit. @@ -54,13 +55,16 @@ class SystemTableScanner { RuntimeProfile* const profile_; + const TSystemTableName::type table_name_; + /// if true, nothing left to return in getNext() in SystemTableScanNode bool eos_; }; class QueryScanner : public SystemTableScanner { public: - QueryScanner(RuntimeState* state, RuntimeProfile* profile); + QueryScanner(RuntimeState* state, RuntimeProfile* profile, + TSystemTableName::type table_name); /// Start scan, load list of query IDs into active_query_ids_. virtual Status Open(); diff --git a/fe/src/main/java/org/apache/impala/catalog/SystemTable.java b/fe/src/main/java/org/apache/impala/catalog/SystemTable.java index b8622e688..180dc9fac 100644 --- a/fe/src/main/java/org/apache/impala/catalog/SystemTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/SystemTable.java @@ -37,7 +37,6 @@ import org.apache.impala.thrift.TTableDescriptor; import org.apache.impala.thrift.TTableType; import org.apache.impala.util.EventSequence; import org.apache.impala.util.TResultRowBuilder; -import com.google.common.base.Preconditions; /** * Represents a system table reflecting backend internal state. @@ -99,12 +98,27 @@ public final class SystemTable extends Table implements FeSystemTable { public void load(boolean reuseMetadata, IMetaStoreClient client, org.apache.hadoop.hive.metastore.api.Table msTbl, String reason, EventSequence catalogTimeline) throws TableLoadingException { - int pos = colsByPos_.size(); - // Should be no partition columns. - Preconditions.checkState(pos == 0); - for (FieldSchema s: msTbl.getSd().getCols()) { - Type type = FeCatalogUtils.parseColumnType(s, getName()); - addColumn(new Column(s.getName(), type, s.getComment(), pos++)); + if (msTbl.getPartitionKeysSize() > 0) { + throw new TableLoadingException( + "System table cannot contain clustering columns: " + name_); + } + + Table.LOADING_TABLES.incrementAndGet(); + try { + // Reload all columns. + clearColumns(); + numClusteringCols_ = 0; + int pos = 0; + for (FieldSchema s: msTbl.getSd().getCols()) { + addColumn(new Column(s.getName(), parseColumnType(s), s.getComment(), pos++)); + } + + // Ensure table metadata points to the latest version. + setMetaStoreTable(msTbl); + refreshLastUsedTime(); + } finally { + // Ensure this is decremented in case an exception is thrown. + Table.LOADING_TABLES.decrementAndGet(); } } diff --git a/tests/custom_cluster/test_query_live.py b/tests/custom_cluster/test_query_live.py index 3a2595cd0..2df6036ae 100644 --- a/tests/custom_cluster/test_query_live.py +++ b/tests/custom_cluster/test_query_live.py @@ -42,13 +42,16 @@ class TestQueryLive(CustomClusterTestSuite): def assert_describe_extended(self): describe_ext_result = self.execute_query('describe extended sys.impala_query_live') - assert len(describe_ext_result.data) == 80 + # Alter can add additional event fields. Filter them out. + describe_ext_data = [ + line for line in describe_ext_result.data if 'impala.events.catalog' not in line] + assert len(describe_ext_data) == 80 system_table_re = re.compile(r'__IMPALA_SYSTEM_TABLE\s+true') - assert list(filter(system_table_re.search, describe_ext_result.data)) + assert list(filter(system_table_re.search, describe_ext_data)) external_re = re.compile(r'EXTERNAL\s+TRUE') - assert list(filter(external_re.search, describe_ext_result.data)) + assert list(filter(external_re.search, describe_ext_data)) external_table_re = re.compile(r'Table Type:\s+EXTERNAL_TABLE') - assert list(filter(external_table_re.search, describe_ext_result.data)) + assert list(filter(external_table_re.search, describe_ext_data)) def assert_impalads(self, profile, present=[0, 1, 2], absent=[]): for port_idx in present: @@ -207,6 +210,41 @@ class TestQueryLive(CustomClusterTestSuite): assert_query('sys.impala_query_live', self.client, 'test_query_live', result.runtime_profile) + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + "--cluster_id=test_query_live", + catalogd_args="--enable_workload_mgmt") + def test_alter(self): + """Asserts alter works on query live table.""" + column_desc = 'test_alter\tstring\t' + + add_column = self.execute_query( + 'alter table sys.impala_query_live add columns(test_alter string)') + assert add_column.data == ['New column(s) have been added to the table.'] + + try: + describe_column = self.execute_query('describe sys.impala_query_live') + assert len(describe_column.data) == 50 + assert column_desc in describe_column.data + + select_column = self.execute_query( + 'select test_alter from sys.impala_query_live limit 1') + assert select_column.data == ['NULL'] + self.assert_impalad_log_contains('WARNING', r'Unknown column \(position 49\)' + + ' added to table IMPALA_QUERY_LIVE; check if a coordinator was upgraded') + finally: + # Ensure new column is dropped in case of test failure + drop_column = self.execute_query( + 'alter table sys.impala_query_live drop test_alter') + + assert drop_column.data == ['Column has been dropped.'] + + describe_column2 = self.execute_query('describe sys.impala_query_live') + assert len(describe_column2.data) == 49 + assert column_desc not in describe_column2.data + + select_column2 = self.execute_query('select * from sys.impala_query_live') + assert len(select_column2.data) > 1 + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " "--cluster_id=test_query_live", catalogd_args="--enable_workload_mgmt",
