[ https://issues.apache.org/jira/browse/HIVE-25195?focusedWorklogId=606605&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-606605 ]
ASF GitHub Bot logged work on HIVE-25195: ----------------------------------------- Author: ASF GitHub Bot Created on: 04/Jun/21 08:03 Start Date: 04/Jun/21 08:03 Worklog Time Spent: 10m Work Description: pvary commented on a change in pull request #2347: URL: https://github.com/apache/hive/pull/2347#discussion_r644897608 ########## File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java ########## @@ -437,19 +438,13 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, throws MetaException { String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString(); JobContext jobContext = getJobContextForCommitOrAbort(tableName, overwrite); - boolean failure = false; - try { - OutputCommitter committer = new HiveIcebergOutputCommitter(); - committer.commitJob(jobContext); - } catch (Exception e) { - failure = true; - LOG.error("Error while trying to commit job", e); - throw new MetaException(StringUtils.stringifyException(e)); - } finally { - // if there's a failure, the configs will still be needed in rollbackInsertTable - if (!failure) { - // avoid config pollution with prefixed/suffixed keys - cleanCommitConfig(tableName); + if (jobContext != null) { Review comment: Is this a bugfix? Do we expect null here? ########## File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java ########## @@ -437,19 +438,13 @@ public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, throws MetaException { String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString(); JobContext jobContext = getJobContextForCommitOrAbort(tableName, overwrite); - boolean failure = false; - try { - OutputCommitter committer = new HiveIcebergOutputCommitter(); - committer.commitJob(jobContext); - } catch (Exception e) { - failure = true; - LOG.error("Error while trying to commit job", e); - throw new MetaException(StringUtils.stringifyException(e)); - } finally { - // if there's a failure, the configs will still be needed in rollbackInsertTable - if (!failure) { - // avoid config pollution with prefixed/suffixed keys - cleanCommitConfig(tableName); + if (jobContext != null) { Review comment: Oh... I found the reason behind it 😄 ########## File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java ########## @@ -459,35 +454,35 @@ public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table throws MetaException { String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString(); JobContext jobContext = getJobContextForCommitOrAbort(tableName, overwrite); - OutputCommitter committer = new HiveIcebergOutputCommitter(); - try { - LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: {}", jobContext.getJobID(), tableName); - committer.abortJob(jobContext, JobStatus.State.FAILED); - } catch (IOException e) { - LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", e); - // no throwing here because the original commitInsertTable exception should be propagated - } finally { - // avoid config pollution with prefixed/suffixed keys - cleanCommitConfig(tableName); + if (jobContext != null) { + OutputCommitter committer = new HiveIcebergOutputCommitter(); + try { + LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: {}", jobContext.getJobID(), tableName); + committer.abortJob(jobContext, JobStatus.State.FAILED); + } catch (IOException e) { + LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", e); + // no throwing here because the original commitInsertTable exception should be propagated + } } } - private void cleanCommitConfig(String tableName) { - conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName); - conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName); - conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName); - conf.unset(InputFormatConfig.OUTPUT_TABLES); - } - private JobContext getJobContextForCommitOrAbort(String tableName, boolean overwrite) { Review comment: Maybe Optional? ########## File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java ########## @@ -183,10 +183,9 @@ private void createTableForCTAS(Configuration configuration, Properties serDePro serDeProperties.get(Catalogs.NAME), tableSchema, serDeProperties.get(InputFormatConfig.PARTITION_SPEC)); Catalogs.createTable(configuration, serDeProperties); - // set these in the global conf so that we can rollback the table in the lifecycle hook in case of failures - String queryId = configuration.get(HiveConf.ConfVars.HIVEQUERYID.varname); - configuration.set(String.format(InputFormatConfig.IS_CTAS_QUERY_TEMPLATE, queryId), "true"); - configuration.set(String.format(InputFormatConfig.CTAS_TABLE_NAME_TEMPLATE, queryId), + // set these in the query state so that we can rollback the table in the lifecycle hook in case of failures + SessionStateUtil.addResource(configuration, InputFormatConfig.IS_CTAS_QUERY, "true"); Review comment: Do we need both? Maybe if we have `CTAS_TABLE_NAME` then `IS_CTAS_QUERY` is `true` ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ########## @@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep // get all target tables this vertex wrote to List<String> tables = new ArrayList<>(); for (Map.Entry<String, String> entry : jobConf) { Review comment: Do we have a faster solution for this? The `jobConf` could be very-very big ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ########## @@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep // get all target tables this vertex wrote to List<String> tables = new ArrayList<>(); for (Map.Entry<String, String> entry : jobConf) { - if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) { - tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length())); + if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) { + tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length())); } } - // save information for each target table (jobID, task num, query state) + // find iceberg props in jobConf as they can be needed, but not available, during job commit + Map<String, String> icebergProperties = new HashMap<>(); + jobConf.forEach(e -> { Review comment: WE iterated through the jobConf a few lines ago. Might worth to consider to merge the loops, if all else fails ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ########## @@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep // get all target tables this vertex wrote to List<String> tables = new ArrayList<>(); for (Map.Entry<String, String> entry : jobConf) { - if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) { - tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length())); + if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) { + tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length())); } } - // save information for each target table (jobID, task num, query state) + // find iceberg props in jobConf as they can be needed, but not available, during job commit + Map<String, String> icebergProperties = new HashMap<>(); + jobConf.forEach(e -> { Review comment: WE iterated through the jobConf a few lines ago. Might worth to consider to merge the loops, if everything else fails ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ########## @@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) throws IOException, TezExcep // get all target tables this vertex wrote to List<String> tables = new ArrayList<>(); for (Map.Entry<String, String> entry : jobConf) { - if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) { - tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length())); + if (entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) { + tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length())); } } - // save information for each target table (jobID, task num, query state) + // find iceberg props in jobConf as they can be needed, but not available, during job commit + Map<String, String> icebergProperties = new HashMap<>(); + jobConf.forEach(e -> { + // don't copy the serialized tables, they're not needed anymore and take up lots of space + if (e.getKey().startsWith("iceberg.mr.") && !e.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) { + icebergProperties.put(e.getKey(), e.getValue()); + } + }); + // save information for each target table (jobID, task num) for (String table : tables) { - sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID_PREFIX + table, jobIdStr); - sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + table, - status.getProgress().getSucceededTaskCount()); + SessionStateUtil.newCommitInfo(jobConf, table) Review comment: This is a little bit odd to me. I mean I understand what did you do, but it still feels strange. Convince me 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 606605) Time Spent: 2h 10m (was: 2h) > Store Iceberg write commit and ctas information in QueryState > -------------------------------------------------------------- > > Key: HIVE-25195 > URL: https://issues.apache.org/jira/browse/HIVE-25195 > Project: Hive > Issue Type: Improvement > Reporter: Marton Bod > Assignee: Marton Bod > Priority: Major > Labels: pull-request-available > Time Spent: 2h 10m > Remaining Estimate: 0h > > We should replace the current method of passing Iceberg write commit-related > information (jobID, task num) and CTAS info via the session conf using > prefixed keys. We have a new way of doing that more cleanly, using the > QueryState object. This should make the code easier to maintain and guard > against accidental session conf pollution. -- This message was sent by Atlassian Jira (v8.3.4#803005)