[ 
https://issues.apache.org/jira/browse/HIVE-25006?focusedWorklogId=585745&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-585745
 ]

ASF GitHub Bot logged work on HIVE-25006:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Apr/21 11:57
            Start Date: 20/Apr/21 11:57
    Worklog Time Spent: 10m 
      Work Description: marton-bod commented on a change in pull request #2161:
URL: https://github.com/apache/hive/pull/2161#discussion_r616613187



##########
File path: 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -256,4 +265,74 @@ private static PartitionSpec spec(Schema schema, 
Properties properties,
       return PartitionSpec.unpartitioned();
     }
   }
+
+  @Override
+  public void commitInsertTable(org.apache.hadoop.hive.metastore.api.Table 
table, boolean overwrite)
+      throws MetaException {
+    String tableName = TableIdentifier.of(table.getDbName(), 
table.getTableName()).toString();
+
+    // check status to determine whether we need to commit or to abort
+    JobConf jobConf = new JobConf(conf);
+    String queryIdKey = jobConf.get("hive.query.id") + "." + tableName + 
".result";
+    boolean success = jobConf.getBoolean(queryIdKey, false);
+
+    // construct the job context
+    JobID jobID = JobID.forName(jobConf.get(TezTask.HIVE_TEZ_COMMIT_JOB_ID + 
"." + tableName));
+    int numTasks = conf.getInt(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + 
tableName, -1);
+    jobConf.setNumReduceTasks(numTasks);
+    JobContext jobContext = new JobContextImpl(jobConf, jobID, null);
+
+    // we should only commit this current table because
+    // for multi-table inserts, this hook method will be called sequentially 
for each target table
+    jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
+
+    OutputCommitter committer = new HiveIcebergOutputCommitter();
+    try {
+      if (success) {
+        try {
+          committer.commitJob(jobContext);
+        } catch (Exception commitExc) {
+          LOG.error("Error while trying to commit job (table: {}, jobID: {}). 
Will abort it now.",
+              tableName, jobID, commitExc);
+          abortJob(jobContext, committer, true);
+          throw new MetaException("Unable to commit job: " + 
commitExc.getMessage());
+        }
+      } else {
+        abortJob(jobContext, committer, false);
+      }
+    } finally {
+      // avoid config pollution with prefixed/suffixed keys
+      cleanCommitConfig(queryIdKey, tableName);
+    }
+  }
+
+  private void abortJob(JobContext jobContext, OutputCommitter committer, 
boolean suppressExc) throws MetaException {
+    try {
+      committer.abortJob(jobContext, JobStatus.State.FAILED);
+    } catch (IOException abortExc) {
+      LOG.error("Error while trying to abort failed job. There might be 
uncleaned data files.", abortExc);
+      if (!suppressExc) {
+        throw new MetaException("Unable to abort job: " + 
abortExc.getMessage());
+      }
+    }
+  }
+
+  private void cleanCommitConfig(String queryIdKey, String tableName) {
+    conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID + "." + tableName);
+    conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT + "." + tableName);
+    conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName);
+    conf.unset(queryIdKey);
+  }
+
+  @Override
+  public void preInsertTable(org.apache.hadoop.hive.metastore.api.Table table, 
boolean overwrite)
+      throws MetaException {
+    // do nothing
+  }
+
+  @Override
+  public void rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table 
table, boolean overwrite)
+      throws MetaException {
+    // do nothing

Review comment:
       I didn't put it there because f there is an execution error we should 
get a non-0 return code from Tez AM, instead of an exception. But now that I'm 
thinking about it, I suppose Hive will throw an exception at the end if the 
code was non-0, but I will test it out to make sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 585745)
    Time Spent: 2h  (was: 1h 50m)

> Commit Iceberg writes in HiveMetaHook instead of TezAM
> ------------------------------------------------------
>
>                 Key: HIVE-25006
>                 URL: https://issues.apache.org/jira/browse/HIVE-25006
>             Project: Hive
>          Issue Type: Task
>            Reporter: Marton Bod
>            Assignee: Marton Bod
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> Trigger the write commits in the HiveIcebergStorageHandler#commitInsertTable. 
> This will enable us to implement insert overwrites for iceberg tables.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to