[ https://issues.apache.org/jira/browse/HIVE-23559?focusedWorklogId=831772&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-831772 ]
ASF GitHub Bot logged work on HIVE-23559: ----------------------------------------- Author: ASF GitHub Bot Created on: 07/Dec/22 13:54 Start Date: 07/Dec/22 13:54 Worklog Time Spent: 10m Work Description: difin commented on code in PR #3795: URL: https://github.com/apache/hive/pull/3795#discussion_r1042234444 ########## ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java: ########## @@ -5208,55 +5208,94 @@ private static void moveAcidFiles(String deltaFileType, PathFilter pathFilter, F } LOG.debug("Acid move found " + deltaStats.length + " " + deltaFileType + " files"); + List<Future<Void>> futures = new LinkedList<>(); + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Acid-Files-Thread-%d").build()) : null; + for (FileStatus deltaStat : deltaStats) { - Path deltaPath = deltaStat.getPath(); - // Create the delta directory. Don't worry if it already exists, - // as that likely means another task got to it first. Then move each of the buckets. - // it would be more efficient to try to move the delta with it's buckets but that is - // harder to make race condition proof. - Path deltaDest = new Path(dst, deltaPath.getName()); - try { - if (!createdDeltaDirs.contains(deltaDest)) { - try { - if(fs.mkdirs(deltaDest)) { - try { - fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()), - AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest)); - } catch (FileNotFoundException fnf) { - // There might be no side file. Skip in this case. - } + + if (null == pool) { + moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs, newFiles, deltaStat); + } else { + futures.add(pool.submit(new Callable<Void>() { + @Override + public Void call() throws HiveException { + try { + moveAcidFilesForDelta(deltaFileType, fs, dst, createdDeltaDirs, newFiles, deltaStat); + } catch (Exception e) { + final String poolMsg = + "Unable to move source " + deltaStat.getPath().getName() + " to destination " + dst.getName(); + throw getHiveException(e, poolMsg); } - createdDeltaDirs.add(deltaDest); - } catch (IOException swallowIt) { - // Don't worry about this, as it likely just means it's already been created. - LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest + - ", assuming it already exists: " + swallowIt.getMessage()); + return null; } + })); + } + } + + if (null != pool) { + pool.shutdown(); + for (Future<Void> future : futures) { + try { + future.get(); + } catch (Exception e) { + throw handlePoolException(pool, e); } - FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter); - LOG.debug("Acid move found " + bucketStats.length + " bucket files"); - for (FileStatus bucketStat : bucketStats) { - Path bucketSrc = bucketStat.getPath(); - Path bucketDest = new Path(deltaDest, bucketSrc.getName()); - final String msg = "Unable to move source " + bucketSrc + " to destination " + - bucketDest; - LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " + - bucketDest.toUri().toString()); - try { - fs.rename(bucketSrc, bucketDest); - if (newFiles != null) { - newFiles.add(bucketDest); + } + } + } + + private static void moveAcidFilesForDelta(String deltaFileType, FileSystem fs, + Path dst, Set<Path> createdDeltaDirs, + List<Path> newFiles, FileStatus deltaStat) throws HiveException { + + Path deltaPath = deltaStat.getPath(); + // Create the delta directory. Don't worry if it already exists, + // as that likely means another task got to it first. Then move each of the buckets. + // it would be more efficient to try to move the delta with it's buckets but that is + // harder to make race condition proof. + Path deltaDest = new Path(dst, deltaPath.getName()); + try { + if (!createdDeltaDirs.contains(deltaDest)) { + try { + if(fs.mkdirs(deltaDest)) { + try { + fs.rename(AcidUtils.OrcAcidVersion.getVersionFilePath(deltaStat.getPath()), + AcidUtils.OrcAcidVersion.getVersionFilePath(deltaDest)); + } catch (FileNotFoundException fnf) { + // There might be no side file. Skip in this case. } - } catch (Exception e) { - throw getHiveException(e, msg); } + createdDeltaDirs.add(deltaDest); + } catch (IOException swallowIt) { + // Don't worry about this, as it likely just means it's already been created. + LOG.info("Unable to create " + deltaFileType + " directory " + deltaDest + + ", assuming it already exists: " + swallowIt.getMessage()); } - } catch (IOException e) { - throw new HiveException("Error moving acid files " + e.getMessage(), e); } + FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter); + LOG.debug("Acid move found " + bucketStats.length + " bucket files"); + for (FileStatus bucketStat : bucketStats) { + Path bucketSrc = bucketStat.getPath(); + Path bucketDest = new Path(deltaDest, bucketSrc.getName()); + final String msg = "Unable to move source " + bucketSrc + " to destination " + Review Comment: Done Issue Time Tracking ------------------- Worklog Id: (was: 831772) Time Spent: 1.5h (was: 1h 20m) > Optimise Hive::moveAcidFiles for cloud storage > ---------------------------------------------- > > Key: HIVE-23559 > URL: https://issues.apache.org/jira/browse/HIVE-23559 > Project: Hive > Issue Type: Improvement > Reporter: Rajesh Balamohan > Assignee: Dmitriy Fingerman > Priority: Major > Labels: pull-request-available > Time Spent: 1.5h > Remaining Estimate: 0h > > [https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L4752] > It ends up transferring DELTA, DELETE_DELTA, BASE prefixes sequentially from > staging to final location. > This causes delays even with simple updates statements, which updates smaller > number of records in cloud storage. -- This message was sent by Atlassian Jira (v8.20.10#820010)