[ https://issues.apache.org/jira/browse/HIVE-26319?focusedWorklogId=782338&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-782338 ]
ASF GitHub Bot logged work on HIVE-26319: ----------------------------------------- Author: ASF GitHub Bot Created on: 17/Jun/22 09:26 Start Date: 17/Jun/22 09:26 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3362: URL: https://github.com/apache/hive/pull/3362#discussion_r899942578 ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java: ########## @@ -411,23 +411,27 @@ public boolean commitInMoveTask() { public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); Configuration configuration = SessionState.getSessionConf(); - Optional<JobContext> jobContext = generateJobContext(configuration, tableName, overwrite); - if (jobContext.isPresent()) { + Optional<List<JobContext>> jobContextList = generateJobContext(configuration, tableName, overwrite); + if (!jobContextList.isPresent()) { + return; + } + + for (JobContext jobContext : jobContextList.get()) { OutputCommitter committer = new HiveIcebergOutputCommitter(); try { - committer.commitJob(jobContext.get()); + committer.commitJob(jobContext); } catch (Throwable e) { // Aborting the job if the commit has failed LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}", - jobContext.get().getJobID(), tableName, e); + jobContext.getJobID(), tableName, e); try { - committer.abortJob(jobContext.get(), JobStatus.State.FAILED); + committer.abortJob(jobContext, JobStatus.State.FAILED); Review Comment: I think all jobs should be rolled back in case of error when committing any of them. To achieve this we are using `org.apache.iceberg.util.Tasks`: ``` Tasks.foreach(outputs) .throwFailureWhenFinished() .stopOnFailure() .run(output -> { ... ``` which can revert all tasks in case of error even if some of them are already succeeded. The initial implementation committed each job independently: all jobs launched a separate batch of tasks. I refactored this part to collect all outputs from all jobs and launch it in one batch. I also found that this is done parallel and we are looking up the necessary data for commit in the SessionState which is stored thread locally. I experienced that this is working only if one output exists since only one worker thread is used and that is the main thread where the `SessionState` is initialized. However if more than one outputs exists in a batch threads other than the main thread does not have the necessary data for commit in the `SessionState`. So I extracted collecting these data prior launching the tasks. This affects multi inserts, split updates and merge statements. I haven't found any tests for multi inserting into an iceberg table (please share some if any exists) I assume this issue haven't came up before. Please share your thoughts. Issue Time Tracking ------------------- Worklog Id: (was: 782338) Time Spent: 2h (was: 1h 50m) > Iceberg integration: Perform update split early > ----------------------------------------------- > > Key: HIVE-26319 > URL: https://issues.apache.org/jira/browse/HIVE-26319 > Project: Hive > Issue Type: Improvement > Components: File Formats > Reporter: Krisztian Kasa > Assignee: Krisztian Kasa > Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > Time Spent: 2h > Remaining Estimate: 0h > > Extend update split early to iceberg tables like in HIVE-21160 for native > acid tables -- This message was sent by Atlassian Jira (v8.20.7#820007)