[ https://issues.apache.org/jira/browse/HIVE-25006?focusedWorklogId=585731&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-585731 ]
ASF GitHub Bot logged work on HIVE-25006: ----------------------------------------- Author: ASF GitHub Bot Created on: 20/Apr/21 11:41 Start Date: 20/Apr/21 11:41 Worklog Time Spent: 10m Work Description: pvary commented on a change in pull request #2161: URL: https://github.com/apache/hive/pull/2161#discussion_r616603042 ########## File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java ########## @@ -256,4 +265,74 @@ private static PartitionSpec spec(Schema schema, Properties properties, return PartitionSpec.unpartitioned(); } } + + @Override + public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) + throws MetaException { + String tableName = TableIdentifier.of(table.getDbName(), table.getTableName()).toString(); + + // check status to determine whether we need to commit or to abort + JobConf jobConf = new JobConf(conf); + String queryIdKey = jobConf.get("hive.query.id") + "." + tableName + ".result"; + boolean success = jobConf.getBoolean(queryIdKey, false); + + // construct the job context + JobID jobID = JobID.forName(jobConf.get(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName)); + int numTasks = conf.getInt(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName, -1); + jobConf.setNumReduceTasks(numTasks); + JobContext jobContext = new JobContextImpl(jobConf, jobID, null); + + // we should only commit this current table because + // for multi-table inserts, this hook method will be called sequentially for each target table + jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName); + + OutputCommitter committer = new HiveIcebergOutputCommitter(); + try { + if (success) { + try { + committer.commitJob(jobContext); + } catch (Exception commitExc) { + LOG.error("Error while trying to commit job (table: {}, jobID: {}). Will abort it now.", + tableName, jobID, commitExc); + abortJob(jobContext, committer, true); + throw new MetaException("Unable to commit job: " + commitExc.getMessage()); + } + } else { + abortJob(jobContext, committer, false); + } + } finally { + // avoid config pollution with prefixed/suffixed keys + cleanCommitConfig(queryIdKey, tableName); + } + } + + private void abortJob(JobContext jobContext, OutputCommitter committer, boolean suppressExc) throws MetaException { + try { + committer.abortJob(jobContext, JobStatus.State.FAILED); + } catch (IOException abortExc) { + LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", abortExc); + if (!suppressExc) { + throw new MetaException("Unable to abort job: " + abortExc.getMessage()); + } + } + } + + private void cleanCommitConfig(String queryIdKey, String tableName) { + conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName); + conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName); + conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName); + conf.unset(queryIdKey); + } + + @Override + public void preInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) + throws MetaException { + // do nothing + } + + @Override + public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table, boolean overwrite) + throws MetaException { + // do nothing Review comment: Shouldn't we call abortJob here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 585731) Time Spent: 1h 20m (was: 1h 10m) > Commit Iceberg writes in HiveMetaHook instead of TezAM > ------------------------------------------------------ > > Key: HIVE-25006 > URL: https://issues.apache.org/jira/browse/HIVE-25006 > Project: Hive > Issue Type: Task > Reporter: Marton Bod > Assignee: Marton Bod > Priority: Major > Labels: pull-request-available > Time Spent: 1h 20m > Remaining Estimate: 0h > > Trigger the write commits in the HiveIcebergStorageHandler#commitInsertTable. > This will enable us to implement insert overwrites for iceberg tables. -- This message was sent by Atlassian Jira (v8.3.4#803005)