[ https://issues.apache.org/jira/browse/HIVE-25842?focusedWorklogId=710591&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-710591 ]
ASF GitHub Bot logged work on HIVE-25842: ----------------------------------------- Author: ASF GitHub Bot Created on: 18/Jan/22 15:39 Start Date: 18/Jan/22 15:39 Worklog Time Spent: 10m Work Description: lcspinter commented on a change in pull request #2916: URL: https://github.com/apache/hive/pull/2916#discussion_r786756287 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ########## @@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception { super.init(stop); this.workerName = getWorkerId(); setName(workerName); + metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) && Review comment: Done ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -87,14 +106,15 @@ public void init(AtomicBoolean stop) throws Exception { cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory( conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM), COMPACTOR_CLEANER_THREAD_NAME_FORMAT); + metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) && + MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) && + MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON); Review comment: It doesn't hurt if we double check :) ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ########## @@ -142,6 +144,9 @@ public void init(AtomicBoolean stop) throws Exception { super.init(stop); this.workerName = getWorkerId(); setName(workerName); + metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) && + MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) && + MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON); Review comment: Do we want to update metrics, when the initiator/Cleaner is not running? Can that be a valid use case? ########## File path: ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java ########## @@ -81,6 +81,7 @@ public void setUp() throws Exception { MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true); MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true); Review comment: We need this flag set `true`, otherwise the metrics are not collected. ########## File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java ########## @@ -8831,6 +8833,34 @@ public void mark_failed(CompactionInfoStruct cr) throws MetaException { getTxnHandler().markFailed(CompactionInfo.compactionStructToInfo(cr)); } + @Override + public CompactionMetricsDataResponse get_compaction_metrics_data(String dbName, String tblName, String partitionName, CompactionMetricsMetricType type) throws MetaException { + CompactionMetricsData metricsData = + getTxnHandler().getCompactionMetricsData(dbName, tblName, partitionName, + CompactionMetricsDataConverter.thriftCompactionMetricType2DbType(type)); + CompactionMetricsDataResponse response = new CompactionMetricsDataResponse(); + if (metricsData != null) { + response.setData(CompactionMetricsDataConverter.dataToStruct(metricsData)); + } + return response; + } + + @Override + public boolean update_compaction_metrics_data(CompactionMetricsDataStruct struct, int version) throws MetaException { + return getTxnHandler().updateCompactionMetricsData(CompactionMetricsDataConverter.structToData(struct), version); Review comment: Per java doc, the object must be always non-null. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ########## @@ -671,6 +679,13 @@ private String getWorkerId() { return name.toString(); } + private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, String tableName, String partName, + CompactionType type) { + if (metricsEnabled) { + DeltaFilesMetricReporter.updateMetricsFromWorker(directory, dbName, tableName, partName, type, conf, msc); Review comment: All the `updateMetricsFrom*` methods are static. They are completely stateless, and the outcome of the metrics computation is stored in the backend DB, which is accessible by all the compaction threads regardless of which process is hosting them. ########## File path: service/src/java/org/apache/hive/service/server/HiveServer2.java ########## @@ -214,9 +214,6 @@ public synchronized void init(HiveConf hiveConf) { try { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_METRICS_ENABLED)) { MetricsFactory.init(hiveConf); - if (MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) { - DeltaFilesMetricReporter.init(hiveConf); Review comment: Good catch! ########## File path: standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql ########## @@ -661,6 +661,16 @@ CREATE TABLE COMPLETED_COMPACTIONS ( CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION); +-- HIVE-25842 +CREATE TABLE COMPACTION_METRICS_CACHE ( + CMC_DATABASE varchar(128) NOT NULL, + CMC_TABLE varchar(128) NOT NULL, + CMC_PARTITION varchar(767), + CMC_METRIC_TYPE varchar(128) NOT NULL, + CMC_METRIC_VALUE integer NOT NULL, Review comment: I think if for some reason the value is null, that row shouldn't be in the table. ########## File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift ########## @@ -2926,11 +2945,15 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req) void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1) void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1) void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1) + CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1) Review comment: Yes, it makes sense to change the param to a request object. ########## File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ########## @@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException { } } + @Override + public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName, + CompactionMetricsData.MetricType type) throws MetaException { + Connection dbConn = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + String query = SELECT_COMPACTION_METRICS_CACHE_QUERY; + if (partitionName != null) { + query += " AND \"CMC_PARTITION\" = ?"; + } else { + query += " AND \"CMC_PARTITION\" IS NULL"; + } + try (PreparedStatement pstmt = dbConn.prepareStatement(query)) { + pstmt.setString(1, dbName); + pstmt.setString(2, tblName); + pstmt.setString(3, type.toString()); + if (partitionName != null) { + pstmt.setString(4, partitionName); + } + ResultSet resultSet = pstmt.executeQuery(); + CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder(); + if (resultSet.next()) { + return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type) + .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build(); + } else { + return null; + } + } + + } catch (SQLException e) { + LOG.error("Unable to getDeltaMetricsInfo"); + checkRetryable(e, "getDeltaMetricsInfo"); + throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e)); Review comment: The whole logic on the `DeltaFilesMetricReporter` is wrapped in a huge try-catch block, that catches every `Throwable`, so this shouldn't tackle the compaction threads. ########## File path: common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsFactory.java ########## @@ -34,10 +34,10 @@ /** * Initializes static Metrics instance. */ - public synchronized static void init(HiveConf conf) throws Exception { + public synchronized static void init(Configuration conf) throws Exception { Review comment: Two reasons: 1. It's always better to code to the interface 2. The initiator, where this method is called, has an instance of HiveMestoreConf. ########## File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ########## @@ -308,7 +307,22 @@ "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" FROM \"TXN_COMPONENTS\" " + "INNER JOIN \"TXNS\" ON \"TC_TXNID\" = \"TXN_ID\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED + " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" HAVING COUNT(\"TXN_ID\") > ?"; - + private static final String SELECT_COMPACTION_METRICS_CACHE_QUERY = + "SELECT \"CMC_METRIC_VALUE\", \"CMC_VERSION\" FROM \"COMPACTION_METRICS_CACHE\" " + + "WHERE \"CMC_DATABASE\" = ? AND \"CMC_TABLE\" = ? AND \"CMC_METRIC_TYPE\" = ?"; + private static final String NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY = + "* FROM \"COMPACTION_METRICS_CACHE\" WHERE \"CMC_METRIC_TYPE\" = ? ORDER BY \"CMC_METRIC_VALUE\" DESC"; Review comment: Correct. Changed it to select column names. ########## File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift ########## @@ -2926,11 +2945,15 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req) void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1) void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1) void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1) + CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1) + bool update_compaction_metrics_data(1: CompactionMetricsDataStruct data, 2: i32 version) throws(1:MetaException o1) + void add_compaction_metrics_data(1: CompactionMetricsDataStruct data) throws(1:MetaException o1) + void remove_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1) Review comment: Done ########## File path: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ########## @@ -4112,6 +4126,189 @@ public MetricsInfo getMetricsInfo() throws MetaException { } } + @Override + public CompactionMetricsData getCompactionMetricsData(String dbName, String tblName, String partitionName, + CompactionMetricsData.MetricType type) throws MetaException { + Connection dbConn = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + String query = SELECT_COMPACTION_METRICS_CACHE_QUERY; + if (partitionName != null) { + query += " AND \"CMC_PARTITION\" = ?"; + } else { + query += " AND \"CMC_PARTITION\" IS NULL"; + } + try (PreparedStatement pstmt = dbConn.prepareStatement(query)) { + pstmt.setString(1, dbName); + pstmt.setString(2, tblName); + pstmt.setString(3, type.toString()); + if (partitionName != null) { + pstmt.setString(4, partitionName); + } + ResultSet resultSet = pstmt.executeQuery(); + CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder(); + if (resultSet.next()) { + return builder.dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type) + .metricValue(resultSet.getInt(1)).version(resultSet.getInt(2)).build(); + } else { + return null; + } + } + + } catch (SQLException e) { + LOG.error("Unable to getDeltaMetricsInfo"); + checkRetryable(e, "getDeltaMetricsInfo"); + throw new MetaException("Unable to execute getDeltaMetricsInfo()" + StringUtils.stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (RetryException e) { + return getCompactionMetricsData(dbName, tblName, partitionName, type); + } + } + + @Override + public List<CompactionMetricsData> getTopCompactionMetricsDataPerType(int limit) + throws MetaException { + Connection dbConn = null; + List<CompactionMetricsData> metricsDataList = new ArrayList<>(); + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + for (CompactionMetricsData.MetricType type : CompactionMetricsData.MetricType.values()) { + String query = sqlGenerator.addLimitClause(limit, NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY); + try (PreparedStatement pstmt = dbConn.prepareStatement(query)) { + pstmt.setString(1, type.toString()); + ResultSet resultSet = pstmt.executeQuery(); + while (resultSet.next()) { + CompactionMetricsData.Builder builder = new CompactionMetricsData.Builder(); + metricsDataList.add(builder + .dbName(resultSet.getString(1)) + .tblName(resultSet.getString(2)) + .partitionName(resultSet.getString(3)) + .metricType(type) + .metricValue(resultSet.getInt(5)) + .version(resultSet.getInt(6)) + .build()); + } + } + } + } catch (SQLException e) { + LOG.error("Unable to getCompactionMetricsDataForType"); + checkRetryable(e, "getCompactionMetricsDataForType"); + throw new MetaException("Unable to execute getCompactionMetricsDataForType()" + stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (RetryException e) { + return getTopCompactionMetricsDataPerType(limit); + } + return metricsDataList; + } + + @Override + public boolean updateCompactionMetricsData(CompactionMetricsData data, int version) throws MetaException { + Connection dbConn = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY; + if (data.getPartitionName() != null) { + query += " AND \"CMC_PARTITION\" = ?"; + } else { + query += " AND \"CMC_PARTITION\" IS NULL"; + } + try (PreparedStatement pstmt = dbConn.prepareStatement(query)) { + pstmt.setInt(1, data.getMetricValue()); + pstmt.setInt(2, data.getVersion()); + pstmt.setString(3, data.getDbName()); + pstmt.setString(4, data.getTblName()); + pstmt.setString(5, data.getMetricType().toString()); + pstmt.setInt(6, version); + if (data.getPartitionName() != null) { + pstmt.setString(7, data.getPartitionName()); + } + boolean updateRes = pstmt.executeUpdate() > 0; + dbConn.commit(); + return updateRes; + } + } catch (SQLException e) { + rollbackDBConn(dbConn); + checkRetryable(e, "updateCompactionMetricsData(" + data + ", " + version + ")"); + throw new MetaException("Unable to execute updateCompactionMetricsData()" + stringifyException(e)); + } finally { + closeDbConn(dbConn); + } + } catch (RetryException e) { + updateCompactionMetricsData(data, version); + } + return true; + } + + @Override + public void addCompactionMetricsData(CompactionMetricsData data) throws MetaException { + Connection dbConn = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + try (PreparedStatement pstmt = dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) { + pstmt.setString(1, data.getDbName()); + pstmt.setString(2, data.getTblName()); + pstmt.setString(3, data.getPartitionName()); Review comment: Per definition, the `CMC_PARTITION` column can accept null values. The `PreparedStatement` can convert java null values to db specific null values. ########## File path: standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift ########## @@ -2926,11 +2945,15 @@ PartitionsResponse get_partitions_req(1:PartitionsRequest req) void mark_cleaned(1:CompactionInfoStruct cr) throws(1:MetaException o1) void mark_compacted(1: CompactionInfoStruct cr) throws(1:MetaException o1) void mark_failed(1: CompactionInfoStruct cr) throws(1:MetaException o1) + CompactionMetricsDataResponse get_compaction_metrics_data(1: string dbName, 2: string tblName, 3: string partitionName, 4: CompactionMetricsMetricType type) throws(1:MetaException o1) + bool update_compaction_metrics_data(1: CompactionMetricsDataStruct data, 2: i32 version) throws(1:MetaException o1) Review comment: I moved the update and add together. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa } StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream() .map(Path::getName).collect(Collectors.joining(","))); - return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo); + boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo); + updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size()); + return success; Review comment: `dir.getObsolote()` doesn't include the aborted files. We have a different function for that `dir.getAbortedDirectories()` ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -87,14 +106,15 @@ public void init(AtomicBoolean stop) throws Exception { cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory( conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM), COMPACTOR_CLEANER_THREAD_NAME_FORMAT); + metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) && Review comment: Yes, this was intentional ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa } StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream() .map(Path::getName).collect(Collectors.joining(","))); - return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo); + boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo); + updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size()); Review comment: Good idea! ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ########## @@ -396,7 +423,9 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa } StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream() .map(Path::getName).collect(Collectors.joining(","))); - return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo); + boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo); + updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete().size()); Review comment: The soft-drop partition is called when the partition was dropped before the cleaner could clean it. Since the partition was already dropped, the `TxnHandler.cleanupRecords` must have been called, which removes all the records from the table that belongs to that particular partition. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ########## @@ -381,17 +397,19 @@ public CompactionType run() throws Exception { } } - private CompactionType determineCompactionType(CompactionInfo ci, ValidWriteIdList writeIds, - StorageDescriptor sd, Map<String, String> tblproperties) + private AcidDirectory getAcidDirectory(StorageDescriptor sd,ValidWriteIdList writeIds) throws IOException { Review comment: fixed ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java ########## @@ -331,6 +339,12 @@ private boolean foundCurrentOrFailedCompactions(ShowCompactResponse compactions, } return false; } + + private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, String tableName, String partName) { Review comment: The two methods have different signatures. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ########## @@ -671,6 +679,13 @@ private String getWorkerId() { return name.toString(); } + private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, String tableName, String partName, Review comment: I will create a connection pool in a follow-up PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 710591) Time Spent: 2h 10m (was: 2h) > Reimplement delta file metric collection > ---------------------------------------- > > Key: HIVE-25842 > URL: https://issues.apache.org/jira/browse/HIVE-25842 > Project: Hive > Issue Type: Improvement > Reporter: László Pintér > Assignee: László Pintér > Priority: Major > Labels: pull-request-available > Time Spent: 2h 10m > Remaining Estimate: 0h > > FUNCTIONALITY: Metrics are collected only when a Tez query runs a table > (select * and select count( * ) don't update the metrics) > Metrics aren't updated after compaction or cleaning after compaction, so > users will probably see "issues" with compaction (like many active or > obsolete or small deltas) that don't exist. > RISK: Metrics are collected during queries – we tried to put a try-catch > around each method in DeltaFilesMetricsReporter but of course this isn't > foolproof. This is a HUGE performance and functionality liability. Tests > caught some issues, but our tests aren't perfect. -- This message was sent by Atlassian Jira (v8.20.1#820001)