This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 650758ffd93b326e3bede4822d4d716d7b2aa852 Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Wed May 10 19:01:36 2023 +0200 IMPALA-12130: Table creation time is not set properly in lineage log for Kudu and Iceberg tables For CTAS statements that create Kudu/Iceberg tables the lineage log was incomplete as it missed the table creation time of the newly created table. This information was missing because in CatalogOpExecutor createKuduTable() / createIcebergTable() did not set it in the TDdlExecResponse object. This patch adds the missing information. Testing * e2e test Change-Id: I6938938b1834809d5197a748c171e9a09e13906a Reviewed-on: http://gerrit.cloudera.org:8080/19868 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Gabor Kaszab <[email protected]> --- .../apache/impala/analysis/ColumnLineageGraph.java | 2 +- .../apache/impala/service/CatalogOpExecutor.java | 30 ++++++++++++-- tests/custom_cluster/test_lineage.py | 47 ++++++++++++++++------ 3 files changed, 62 insertions(+), 17 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java index 9ba1f6371..75900cb18 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java +++ b/fe/src/main/java/org/apache/impala/analysis/ColumnLineageGraph.java @@ -497,7 +497,7 @@ public class ColumnLineageGraph { feTable.getMetaStoreTable().getCreateTime()); } else { // -1 is just a placeholder that will be updated after the table/view has been - // created. See impala-server.cc (LogLineageRecord) for more information. + // created. See client-request-state.cc (LogLineageRecord) for more information. metadata = new Metadata(target.tableName_.toString(), -1); } } diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 73bd44474..b7726c5e4 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -3495,6 +3495,10 @@ public class CatalogOpExecutor { Pair<Long, org.apache.hadoop.hive.metastore.api.Table> eventTblPair = getTableFromEvents(events, params.if_not_exists); createEventId = eventTblPair == null ? -1 : eventTblPair.first; + org.apache.hadoop.hive.metastore.api.Table msTable = + eventTblPair == null ? null : eventTblPair.second; + setTableNameAndCreateTimeInResponse(msTable, + newTable.getDbName(), newTable.getTableName(), response); // Add the table to the catalog cache Table newTbl = catalog_ @@ -3574,9 +3578,8 @@ public class CatalogOpExecutor { .getTable(newTable.getDbName(), newTable.getTableName())); } msTable = eventIdTblPair.second; - long tableCreateTime = msTable.getCreateTime(); - response.setTable_name(newTable.getDbName() + "." + newTable.getTableName()); - response.setTable_create_time(tableCreateTime); + setTableNameAndCreateTimeInResponse(msTable, newTable.getDbName(), + newTable.getTableName(), response); // For external tables set table location needed for lineage generation. if (newTable.getTableType() == TableType.EXTERNAL_TABLE.toString()) { String tableLocation = newTable.getSd().getLocation(); @@ -3634,6 +3637,23 @@ public class CatalogOpExecutor { return true; } + /** + * Sets table name and creation time in 'response' based on 'msTable'. + * If 'msTable' is null, then it loads the table from HMS. + * Throws exception if table is not found. + */ + private void setTableNameAndCreateTimeInResponse( + org.apache.hadoop.hive.metastore.api.Table msTable, String dbName, String tblName, + TDdlExecResponse response) throws org.apache.thrift.TException { + if (msTable == null) { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { + msTable = msClient.getHiveClient().getTable(dbName, tblName); + } + } + response.setTable_name(dbName + "." + tblName); + response.setTable_create_time(msTable.getCreateTime()); + } + /** * Creates a new view in the metastore and adds an entry to the metadata cache to * lazily load the new metadata on the next access. Re-throws any Metastore @@ -3762,6 +3782,10 @@ public class CatalogOpExecutor { Pair<Long, org.apache.hadoop.hive.metastore.api.Table> eventTblPair = getTableFromEvents(events, ifNotExists); long createEventId = eventTblPair == null ? -1 : eventTblPair.first; + org.apache.hadoop.hive.metastore.api.Table msTable = + eventTblPair == null ? null : eventTblPair.second; + setTableNameAndCreateTimeInResponse(msTable, + newTable.getDbName(), newTable.getTableName(), response); // Add the table to the catalog cache Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(), newTable.getTableName(), TImpalaTableType.TABLE, tblComment, diff --git a/tests/custom_cluster/test_lineage.py b/tests/custom_cluster/test_lineage.py index 6551d545e..a670b428f 100644 --- a/tests/custom_cluster/test_lineage.py +++ b/tests/custom_cluster/test_lineage.py @@ -82,11 +82,25 @@ class TestLineage(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}" .format(CREATE_TABLE_TIME_LINEAGE_LOG_DIR)) - def test_create_table_timestamp(self, vector, unique_database): + def test_create_table_timestamp(self, unique_database): + for table_format in ['textfile', 'kudu', 'iceberg']: + self.run_test_create_table_timestamp(unique_database, table_format) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + "--lineage_event_log_dir={0}" + .format(CREATE_TABLE_TIME_LINEAGE_LOG_DIR), + catalogd_args="--hms_event_polling_interval_s=0") + def test_create_table_timestamp_without_hms_events(self, unique_database): + for table_format in ['textfile', 'kudu', 'iceberg']: + self.run_test_create_table_timestamp(unique_database, table_format) + + def run_test_create_table_timestamp(self, unique_database, table_format): """Test that 'createTableTime' in the lineage graph are populated with valid value from HMS.""" - query = "create table {0}.lineage_test_tbl as select int_col, tinyint_col " \ - "from functional.alltypes".format(unique_database) + query = "create table {0}.lineage_test_tbl_{1} primary key (int_col) stored as {1} " \ + "as select int_col, bigint_col from functional.alltypes".format( + unique_database, table_format) result = self.execute_query_expect_success(self.client, query) profile_query_id = re.search("Query \(id=(.*)\):", result.runtime_profile).group(1) @@ -98,16 +112,23 @@ class TestLineage(CustomClusterTestSuite): # Only the coordinator's log file will be populated. if os.path.getsize(log_path) > 0: with open(log_path) as log_file: - lineage_json = json.load(log_file) - assert lineage_json["queryId"] == profile_query_id - vertices = lineage_json["vertices"] - for vertex in vertices: - if vertex["vertexId"] == "int_col": - assert "metadata" in vertex - table_name = vertex["metadata"]["tableName"] - table_create_time = int(vertex["metadata"]["tableCreateTime"]) - assert "{0}.lineage_test_tbl".format(unique_database) == table_name - assert table_create_time != -1 + for line in log_file: + # Now that the test is executed multiple times we need to take a look at + # only the line that contains the expected table name. + expected_table_name =\ + "{0}.lineage_test_tbl_{1}".format(unique_database, table_format) + if expected_table_name not in line: continue + + lineage_json = json.loads(line) + assert lineage_json["queryId"] == profile_query_id + vertices = lineage_json["vertices"] + for vertex in vertices: + if vertex["vertexId"] == "int_col": + assert "metadata" in vertex + table_name = vertex["metadata"]["tableName"] + table_create_time = int(vertex["metadata"]["tableCreateTime"]) + assert expected_table_name == table_name + assert table_create_time != -1 @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--lineage_event_log_dir={0}"
