[ https://issues.apache.org/jira/browse/HIVE-22977?focusedWorklogId=841316&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841316 ]
ASF GitHub Bot logged work on HIVE-22977: ----------------------------------------- Author: ASF GitHub Bot Created on: 24/Jan/23 08:47 Start Date: 24/Jan/23 08:47 Worklog Time Spent: 10m Work Description: deniskuzZ commented on code in PR #3801: URL: https://github.com/apache/hive/pull/3801#discussion_r1084960728 ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MergeCompactor.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn.compactor; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.io.AcidDirectory; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.regex.Matcher; + +final class MergeCompactor extends QueryCompactor { + + private static final Logger LOG = LoggerFactory.getLogger(MergeCompactor.class.getName()); + + @Override + public boolean run(CompactorContext context) throws IOException, HiveException, InterruptedException { + HiveConf hiveConf = context.getConf(); + Table table = context.getTable(); + AcidDirectory dir = context.getAcidDirectory(); + ValidWriteIdList writeIds = context.getValidWriteIdList(); + StorageDescriptor storageDescriptor = context.getSd(); + CompactionInfo compactionInfo = context.getCompactionInfo(); + if (isMergeCompaction(hiveConf, dir, writeIds, storageDescriptor)) { + // Only inserts happened, it is much more performant to merge the files than running a query + Path outputDirPath = getOutputDirPath(hiveConf, writeIds, + compactionInfo.isMajorCompaction(), storageDescriptor); + try { + return mergeFiles(hiveConf, compactionInfo.isMajorCompaction(), + dir, outputDirPath, AcidUtils.isInsertOnlyTable(table.getParameters())); + } catch (Throwable t) { + // Error handling, just delete the output directory, + // and fall back to query based compaction. + FileSystem fs = outputDirPath.getFileSystem(hiveConf); + if (fs.exists(outputDirPath)) { + fs.delete(outputDirPath, true); + } + return false; + } + } else { + return false; + } + } + + /** + * Returns whether merge compaction must be enabled or not. + * @param conf Hive configuration + * @param directory the directory to be scanned + * @param validWriteIdList list of valid write IDs + * @param storageDescriptor storage descriptor of the underlying table + * @return true, if merge compaction must be enabled + */ + private boolean isMergeCompaction(HiveConf conf, AcidDirectory directory, + ValidWriteIdList validWriteIdList, + StorageDescriptor storageDescriptor) { + return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_MERGE_COMPACTION_ENABLED) + && storageDescriptor.getOutputFormat().equalsIgnoreCase("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat") + && !hasDeleteOrAbortedDirectories(directory, validWriteIdList); + } + + /** + * Scan a directory for delete deltas or aborted directories. + * @param directory the directory to be scanned + * @param validWriteIdList list of valid write IDs + * @return true, if delete or aborted directory found + */ + private boolean hasDeleteOrAbortedDirectories(AcidDirectory directory, ValidWriteIdList validWriteIdList) { + if (!directory.getCurrentDirectories().isEmpty()) { + final long minWriteId = Optional.ofNullable(validWriteIdList.getMinOpenWriteId()).orElse(1L); + final long maxWriteId = validWriteIdList.getHighWatermark(); + return directory.getCurrentDirectories().stream() + .filter(AcidUtils.ParsedDeltaLight::isDeleteDelta) + .filter(delta -> delta.getMinWriteId() >= minWriteId) + .anyMatch(delta -> delta.getMaxWriteId() <= maxWriteId) || !directory.getAbortedDirectories().isEmpty(); + } + return true; + } + + /** + * Collect the list of all bucket file paths, which belong to the same bucket Id. This method scans all the base + * and delta dirs. + * @param conf hive configuration, must be not null + * @param dir the root directory of delta dirs + * @param includeBaseDir true, if the base directory should be scanned + * @param isMm true, if the table is an insert only table + * @return map of bucket ID -> bucket files + * @throws IOException an error happened during the reading of the directory/bucket file + */ + private Map<Integer, List<Reader>> matchBucketIdToBucketFiles(HiveConf conf, AcidDirectory dir, + boolean includeBaseDir, boolean isMm) throws IOException { + Map<Integer, List<Reader>> result = new HashMap<>(); + if (includeBaseDir && dir.getBaseDirectory() != null) { + result.putAll(getBucketFiles(conf, dir.getBaseDirectory(), isMm)); + } + for (AcidUtils.ParsedDelta deltaDir : dir.getCurrentDirectories()) { + Path deltaDirPath = deltaDir.getPath(); + Map<Integer, List<Reader>> intermediateResult = getBucketFiles(conf, deltaDirPath, isMm); + intermediateResult.forEach((k, v) -> { + if (result.containsKey(k)) { + result.get(k).addAll(v); + } else { + result.put(k, v); + } + }); + } + return result; + } + + /** + * Collect the list of all bucket file paths, which belong to the same bucket Id. This method checks only one + * directory. + * @param conf hive configuration, must be not null + * @param dirPath the directory to be scanned. + * @param isMm collect bucket files fron insert only directories + * @throws IOException an error happened during the reading of the directory/bucket file + */ + private Map<Integer, List<Reader>> getBucketFiles(HiveConf conf, Path dirPath, boolean isMm) throws IOException { + Map<Integer, List<Reader>> bucketIdToBucketFilePath = new HashMap<>(); + FileSystem fs = dirPath.getFileSystem(conf); + FileStatus[] fileStatuses = + fs.listStatus(dirPath, isMm ? AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter); + for (FileStatus f : fileStatuses) { + final Path fPath = f.getPath(); + Matcher matcher = isMm ? AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN + .matcher(fPath.getName()) : AcidUtils.BUCKET_PATTERN.matcher(fPath.getName()); + if (!matcher.find()) { + String errorMessage = String + .format("Found a bucket file which did not match the bucket pattern! %s Matcher=%s", fPath, + matcher); + LOG.error(errorMessage); + throw new IllegalArgumentException(errorMessage); + } + int bucketNum = matcher.groupCount() > 0 ? Integer.parseInt(matcher.group(1)) : Integer.parseInt(matcher.group()); + bucketIdToBucketFilePath.computeIfAbsent(bucketNum, ArrayList::new); + Reader reader = OrcFile.createReader(fs, fPath); + bucketIdToBucketFilePath.computeIfPresent(bucketNum, (k, v) -> v).add(reader); + } + return bucketIdToBucketFilePath; + } + + /** + * Generate output path for compaction. This can be used to generate delta or base directories. + * @param conf hive configuration, must be non-null + * @param writeIds list of valid write IDs + * @param isBaseDir if base directory path should be generated + * @param sd the resolved storadge descriptor + * @return output path, always non-null + */ + private Path getOutputDirPath(HiveConf conf, ValidWriteIdList writeIds, boolean isBaseDir, + StorageDescriptor sd) { + long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); + long highWatermark = writeIds.getHighWatermark(); + long compactorTxnId = Compactor.getCompactorTxnId(conf); + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(isBaseDir) + .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId) Review Comment: could you please format this for readability Issue Time Tracking ------------------- Worklog Id: (was: 841316) Time Spent: 8h 40m (was: 8.5h) > Merge delta files instead of running a query in major/minor compaction > ---------------------------------------------------------------------- > > Key: HIVE-22977 > URL: https://issues.apache.org/jira/browse/HIVE-22977 > Project: Hive > Issue Type: Improvement > Reporter: László Pintér > Assignee: Sourabh Badhya > Priority: Major > Labels: pull-request-available > Attachments: HIVE-22977.01.patch, HIVE-22977.02.patch > > Time Spent: 8h 40m > Remaining Estimate: 0h > > [Compaction Optimiziation] > We should analyse the possibility to move a delta file instead of running a > major/minor compaction query. > Please consider the following use cases: > - full acid table but only insert queries were run. This means that no > delete delta directories were created. Is it possible to merge the delta > directory contents without running a compaction query? > - full acid table, initiating queries through the streaming API. If there > are no abort transactions during the streaming, is it possible to merge the > delta directory contents without running a compaction query? -- This message was sent by Atlassian Jira (v8.20.10#820010)