[ 
https://issues.apache.org/jira/browse/HIVE-25195?focusedWorklogId=606605&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-606605
 ]

ASF GitHub Bot logged work on HIVE-25195:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Jun/21 08:03
            Start Date: 04/Jun/21 08:03
    Worklog Time Spent: 10m 
      Work Description: pvary commented on a change in pull request #2347:
URL: https://github.com/apache/hive/pull/2347#discussion_r644897608



##########
File path: 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -437,19 +438,13 @@ public void 
commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table,
       throws MetaException {
     String tableName = TableIdentifier.of(table.getDbName(), 
table.getTableName()).toString();
     JobContext jobContext = getJobContextForCommitOrAbort(tableName, 
overwrite);
-    boolean failure = false;
-    try {
-      OutputCommitter committer = new HiveIcebergOutputCommitter();
-      committer.commitJob(jobContext);
-    } catch (Exception e) {
-      failure = true;
-      LOG.error("Error while trying to commit job", e);
-      throw new MetaException(StringUtils.stringifyException(e));
-    } finally {
-      // if there's a failure, the configs will still be needed in 
rollbackInsertTable
-      if (!failure) {
-        // avoid config pollution with prefixed/suffixed keys
-        cleanCommitConfig(tableName);
+    if (jobContext != null) {

Review comment:
       Is this a bugfix?
   Do we expect null here?

##########
File path: 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -437,19 +438,13 @@ public void 
commitInsertTable(org.apache.hadoop.hive.metastore.api.Table table,
       throws MetaException {
     String tableName = TableIdentifier.of(table.getDbName(), 
table.getTableName()).toString();
     JobContext jobContext = getJobContextForCommitOrAbort(tableName, 
overwrite);
-    boolean failure = false;
-    try {
-      OutputCommitter committer = new HiveIcebergOutputCommitter();
-      committer.commitJob(jobContext);
-    } catch (Exception e) {
-      failure = true;
-      LOG.error("Error while trying to commit job", e);
-      throw new MetaException(StringUtils.stringifyException(e));
-    } finally {
-      // if there's a failure, the configs will still be needed in 
rollbackInsertTable
-      if (!failure) {
-        // avoid config pollution with prefixed/suffixed keys
-        cleanCommitConfig(tableName);
+    if (jobContext != null) {

Review comment:
       Oh... I found the reason behind it 😄 

##########
File path: 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
##########
@@ -459,35 +454,35 @@ public void 
rollbackInsertTable(org.apache.hadoop.hive.metastore.api.Table table
       throws MetaException {
     String tableName = TableIdentifier.of(table.getDbName(), 
table.getTableName()).toString();
     JobContext jobContext = getJobContextForCommitOrAbort(tableName, 
overwrite);
-    OutputCommitter committer = new HiveIcebergOutputCommitter();
-    try {
-      LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: 
{}", jobContext.getJobID(), tableName);
-      committer.abortJob(jobContext, JobStatus.State.FAILED);
-    } catch (IOException e) {
-      LOG.error("Error while trying to abort failed job. There might be 
uncleaned data files.", e);
-      // no throwing here because the original commitInsertTable exception 
should be propagated
-    } finally {
-      // avoid config pollution with prefixed/suffixed keys
-      cleanCommitConfig(tableName);
+    if (jobContext != null) {
+      OutputCommitter committer = new HiveIcebergOutputCommitter();
+      try {
+        LOG.info("rollbackInsertTable: Aborting job for jobID: {} and table: 
{}", jobContext.getJobID(), tableName);
+        committer.abortJob(jobContext, JobStatus.State.FAILED);
+      } catch (IOException e) {
+        LOG.error("Error while trying to abort failed job. There might be 
uncleaned data files.", e);
+        // no throwing here because the original commitInsertTable exception 
should be propagated
+      }
     }
   }
 
-  private void cleanCommitConfig(String tableName) {
-    conf.unset(TezTask.HIVE_TEZ_COMMIT_JOB_ID_PREFIX + tableName);
-    conf.unset(TezTask.HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + tableName);
-    conf.unset(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableName);
-    conf.unset(InputFormatConfig.OUTPUT_TABLES);
-  }
-
   private JobContext getJobContextForCommitOrAbort(String tableName, boolean 
overwrite) {

Review comment:
       Maybe Optional?

##########
File path: 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
##########
@@ -183,10 +183,9 @@ private void createTableForCTAS(Configuration 
configuration, Properties serDePro
         serDeProperties.get(Catalogs.NAME), tableSchema, 
serDeProperties.get(InputFormatConfig.PARTITION_SPEC));
     Catalogs.createTable(configuration, serDeProperties);
 
-    // set these in the global conf so that we can rollback the table in the 
lifecycle hook in case of failures
-    String queryId = configuration.get(HiveConf.ConfVars.HIVEQUERYID.varname);
-    configuration.set(String.format(InputFormatConfig.IS_CTAS_QUERY_TEMPLATE, 
queryId), "true");
-    
configuration.set(String.format(InputFormatConfig.CTAS_TABLE_NAME_TEMPLATE, 
queryId),
+    // set these in the query state so that we can rollback the table in the 
lifecycle hook in case of failures
+    SessionStateUtil.addResource(configuration, 
InputFormatConfig.IS_CTAS_QUERY, "true");

Review comment:
       Do we need both?
   Maybe if we have `CTAS_TABLE_NAME` then `IS_CTAS_QUERY` is `true`

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) 
throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {

Review comment:
       Do we have a faster solution for this? The `jobConf` could be very-very 
big

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) 
throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) 
{
-                  
tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if 
(entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  
tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, 
query state)
+              // find iceberg props in jobConf as they can be needed, but not 
available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {

Review comment:
       WE iterated through the jobConf a few lines ago. Might worth to consider 
to merge the loops, if all else fails

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) 
throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) 
{
-                  
tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if 
(entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  
tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, 
query state)
+              // find iceberg props in jobConf as they can be needed, but not 
available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {

Review comment:
       WE iterated through the jobConf a few lines ago. Might worth to consider 
to merge the loops, if everything else fails

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
##########
@@ -384,24 +382,28 @@ private void collectCommitInformation(TezWork work) 
throws IOException, TezExcep
               // get all target tables this vertex wrote to
               List<String> tables = new ArrayList<>();
               for (Map.Entry<String, String> entry : jobConf) {
-                if (entry.getKey().startsWith("iceberg.mr.serialized.table.")) 
{
-                  
tables.add(entry.getKey().substring("iceberg.mr.serialized.table.".length()));
+                if 
(entry.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  
tables.add(entry.getKey().substring(ICEBERG_SERIALIZED_TABLE_PREFIX.length()));
                 }
               }
-              // save information for each target table (jobID, task num, 
query state)
+              // find iceberg props in jobConf as they can be needed, but not 
available, during job commit
+              Map<String, String> icebergProperties = new HashMap<>();
+              jobConf.forEach(e -> {
+                // don't copy the serialized tables, they're not needed 
anymore and take up lots of space
+                if (e.getKey().startsWith("iceberg.mr.") && 
!e.getKey().startsWith(ICEBERG_SERIALIZED_TABLE_PREFIX)) {
+                  icebergProperties.put(e.getKey(), e.getValue());
+                }
+              });
+              // save information for each target table (jobID, task num)
               for (String table : tables) {
-                sessionConf.set(HIVE_TEZ_COMMIT_JOB_ID_PREFIX + table, 
jobIdStr);
-                sessionConf.setInt(HIVE_TEZ_COMMIT_TASK_COUNT_PREFIX + table,
-                    status.getProgress().getSucceededTaskCount());
+                SessionStateUtil.newCommitInfo(jobConf, table)

Review comment:
       This is a little bit odd to me.
   I mean I understand what did you do, but it still feels strange. Convince me 
😄 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 606605)
    Time Spent: 2h 10m  (was: 2h)

> Store Iceberg write commit and ctas information in QueryState 
> --------------------------------------------------------------
>
>                 Key: HIVE-25195
>                 URL: https://issues.apache.org/jira/browse/HIVE-25195
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Marton Bod
>            Assignee: Marton Bod
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> We should replace the current method of passing Iceberg write commit-related 
> information (jobID, task num) and CTAS info via the session conf using 
> prefixed keys. We have a new way of doing that more cleanly, using the 
> QueryState object. This should make the code easier to maintain and guard 
> against accidental session conf pollution.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to