[ 
https://issues.apache.org/jira/browse/HIVE-23559?focusedWorklogId=831201&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-831201
 ]

ASF GitHub Bot logged work on HIVE-23559:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Dec/22 21:51
            Start Date: 05/Dec/22 21:51
    Worklog Time Spent: 10m 
      Work Description: ramesh0201 commented on code in PR #3795:
URL: https://github.com/apache/hive/pull/3795#discussion_r1040134949


##########
ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java:
##########
@@ -5208,55 +5208,94 @@ private static void moveAcidFiles(String deltaFileType, 
PathFilter pathFilter, F
     }
     LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " 
files");
 
+    List<Future<Void>> futures = new LinkedList<>();
+    final ExecutorService pool = 
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+            
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
 25),
+                    new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Acid-Files-Thread-%d").build())
 : null;
+
     for (FileStatus deltaStat : deltaStats) {
-      Path deltaPath = deltaStat.getPath();
-      // Create the delta directory.  Don't worry if it already exists,
-      // as that likely means another task got to it first.  Then move each of 
the buckets.
-      // it would be more efficient to try to move the delta with it's buckets 
but that is
-      // harder to make race condition proof.
-      Path deltaDest = new Path(dst, deltaPath.getName());
-      try {
-        if (!createdDeltaDirs.contains(deltaDest)) {
-          try {
-            if(fs.mkdirs(deltaDest)) {
-              try {
-                
fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
-                    AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
-              } catch (FileNotFoundException fnf) {
-                // There might be no side file. Skip in this case.
-              }
+
+      if (null == pool) {
+        moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs, 
newFiles, deltaStat);
+      } else {
+        futures.add(pool.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws HiveException {
+            try {
+              moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs, 
newFiles, deltaStat);
+            } catch (Exception e) {
+              final String poolMsg =
+                      "Unable to move source " + deltaStat.getPath().getName() 
+ " to destination " + dst.getName();
+              throw getHiveException(e, poolMsg);
             }
-            createdDeltaDirs.add(deltaDest);
-          } catch (IOException swallowIt) {
-            // Don't worry about this, as it likely just means it's already 
been created.
-            LOG.info("Unable to create " + deltaFileType + " directory " + 
deltaDest +
-                ", assuming it already exists: " + swallowIt.getMessage());
+            return null;
           }
+        }));
+      }
+    }
+
+    if (null != pool) {
+      pool.shutdown();
+      for (Future<Void> future : futures) {
+        try {
+          future.get();
+        } catch (Exception e) {
+          throw handlePoolException(pool, e);
         }
-        FileStatus[] bucketStats = fs.listStatus(deltaPath, 
AcidUtils.bucketFileFilter);
-        LOG.debug("Acid move found " + bucketStats.length + " bucket files");
-        for (FileStatus bucketStat : bucketStats) {
-          Path bucketSrc = bucketStat.getPath();
-          Path bucketDest = new Path(deltaDest, bucketSrc.getName());
-          final String msg = "Unable to move source " + bucketSrc + " to 
destination " +
-              bucketDest;
-          LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
-              bucketDest.toUri().toString());
-          try {
-            fs.rename(bucketSrc, bucketDest);
-            if (newFiles != null) {
-              newFiles.add(bucketDest);
+      }
+    }
+  }
+
+  private static void moveAcidFilesForDelta(String deltaFileType, FileSystem 
fs,
+                                            Path dst, Set<Path> 
createdDeltaDirs,
+                                            List<Path> newFiles, FileStatus 
deltaStat) throws HiveException {
+
+    Path deltaPath = deltaStat.getPath();
+    // Create the delta directory.  Don't worry if it already exists,
+    // as that likely means another task got to it first.  Then move each of 
the buckets.
+    // it would be more efficient to try to move the delta with it's buckets 
but that is
+    // harder to make race condition proof.
+    Path deltaDest = new Path(dst, deltaPath.getName());
+    try {
+      if (!createdDeltaDirs.contains(deltaDest)) {
+        try {
+          if(fs.mkdirs(deltaDest)) {
+            try {
+              
fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()),
+                      AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest));
+            } catch (FileNotFoundException fnf) {
+              // There might be no side file. Skip in this case.
             }
-          } catch (Exception e) {
-            throw getHiveException(e, msg);
           }
+          createdDeltaDirs.add(deltaDest);
+        } catch (IOException swallowIt) {
+          // Don't worry about this, as it likely just means it's already been 
created.
+          LOG.info("Unable to create " + deltaFileType + " directory " + 
deltaDest +
+                  ", assuming it already exists: " + swallowIt.getMessage());
         }
-      } catch (IOException e) {
-        throw new HiveException("Error moving acid files " + e.getMessage(), 
e);
       }
+      FileStatus[] bucketStats = fs.listStatus(deltaPath, 
AcidUtils.bucketFileFilter);
+      LOG.debug("Acid move found " + bucketStats.length + " bucket files");
+      for (FileStatus bucketStat : bucketStats) {
+        Path bucketSrc = bucketStat.getPath();
+        Path bucketDest = new Path(deltaDest, bucketSrc.getName());
+        final String msg = "Unable to move source " + bucketSrc + " to 
destination " +

Review Comment:
   Minor comment, can we move this to catch block?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 831201)
    Time Spent: 40m  (was: 0.5h)

> Optimise Hive::moveAcidFiles for cloud storage
> ----------------------------------------------
>
>                 Key: HIVE-23559
>                 URL: https://issues.apache.org/jira/browse/HIVE-23559
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Dmitriy Fingerman
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L4752]
> It ends up transferring DELTA, DELETE_DELTA, BASE prefixes sequentially from 
> staging to final location.
> This causes delays even with simple updates statements, which updates smaller 
> number of records in cloud storage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to