[ https://issues.apache.org/jira/browse/HIVE-24852?focusedWorklogId=592953&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-592953 ]
ASF GitHub Bot logged work on HIVE-24852: ----------------------------------------- Author: ASF GitHub Bot Created on: 04/May/21 19:03 Start Date: 04/May/21 19:03 Worklog Time Spent: 10m Work Description: ayushtkn commented on a change in pull request #2043: URL: https://github.com/apache/hive/pull/2043#discussion_r626032642 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/SnapshotUtils.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl.util; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.SnapshotException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.exec.util.Retryable; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_EXTERNAL_TABLE_PATHS; +import static org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask.createTableFileList; +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.externalTableDataPath; +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.getExternalTableBaseDir; + +/** + * Utility class for snapshot related operations. + */ +public class SnapshotUtils { + + private static final transient Logger LOG = LoggerFactory.getLogger(SnapshotUtils.class); + + public static final String OLD_SNAPSHOT = "replOld"; + public static final String NEW_SNAPSHOT = "replNew"; + + /** + * Gets a DistributedFileSystem object if possible from a path. + * @param path path from which DistributedFileSystem needs to be extracted. + * @param conf Hive Configuration. + * @return DFS or null. + */ + public static DistributedFileSystem getDFS(Path path, HiveConf conf) throws IOException { + FileSystem fs = path.getFileSystem(conf); + if (fs instanceof DistributedFileSystem) { + return (DistributedFileSystem) fs; + } else { + LOG.error("FileSystem for {} is not DistributedFileSystem", path); + throw new IOException("The filesystem for path {} is {}, The filesystem should be DistributedFileSystem to " + + "support snapshot based copy."); + } + } + + /** + * Checks whether a given snapshot exists or not. + * @param dfs DistributedFileSystem. + * @param path path of snapshot. + * @param snapshotPrefix snapshot name prefix. + * @param snapshotName name of snapshot. + * @param conf Hive configuration. + * @return true if the snapshot exists. + * @throws IOException in case of any error. + */ + public static boolean isSnapshotAvailable(DistributedFileSystem dfs, Path path, String snapshotPrefix, + String snapshotName, HiveConf conf) throws IOException { + AtomicBoolean isSnapAvlb = new AtomicBoolean(false); + Retryable retryable = Retryable.builder().withHiveConf(conf).withRetryOnException(IOException.class) + .withFailOnException(SnapshotException.class).build(); + try { + retryable.executeCallable(() -> { + isSnapAvlb + .set(dfs.exists(new Path(path, HdfsConstants.DOT_SNAPSHOT_DIR + "/" + snapshotPrefix + snapshotName))); + LOG.debug("Snapshot for path {} is {}", path, isSnapAvlb.get() ? "available" : "unavailable"); + return null; + }); + } catch (Exception e) { + throw new SnapshotException("Failed to check if snapshot is available on " + path, e); + } + return isSnapAvlb.get(); + } + + /** + * Attempts deleting a snapshot, if exists. + * @param dfs DistributedFileSystem + * @param snapshotPath path of snapshot + * @param snapshotName name of the snapshot + * @return true if the snapshot gets deleted. + */ + public static boolean deleteSnapshotSafe(DistributedFileSystem dfs, Path snapshotPath, String snapshotName) + throws IOException { + try { + dfs.deleteSnapshot(snapshotPath, snapshotName); + return true; + } catch (SnapshotException e) { + LOG.debug("Couldn't delete the snapshot {} under path {}", snapshotName, snapshotPath, e); + } catch (FileNotFoundException fnf) { + LOG.warn("Couldn't delete the snapshot {} under path {}", snapshotName, snapshotPath, fnf); + } + return false; + } + + /** + * Attempts to disallow snapshot on a path, ignoring exceptions. + * @param dfs DistributedFileSystem + * @param snapshotPath path of snapshot + */ + public static void disallowSnapshot(DistributedFileSystem dfs, Path snapshotPath) { + try { + // Check if the directory is snapshottable. + if (dfs.getFileStatus(snapshotPath).isSnapshotEnabled()) { + dfs.disallowSnapshot(snapshotPath); + } + } catch (Exception e) { + LOG.warn("Could not disallow snapshot for path {}", snapshotPath, e); + } + } + + /** + * Attempts to allow snapshots on the path, with retry. + * @param dfs DistributedFileSystem. + * @param snapshotPath path of snapshot. + * @param conf Hive Configuration. + */ + public static void allowSnapshot(DistributedFileSystem dfs, Path snapshotPath, HiveConf conf) throws IOException { + // Check if the directory is already snapshottable. + Retryable retryable = Retryable.builder().withHiveConf(conf).withRetryOnException(IOException.class) + .withFailOnException(SnapshotException.class).build(); + try { + retryable.executeCallable(() -> { + try { + if (!dfs.getFileStatus(snapshotPath).isSnapshotEnabled()) { + dfs.allowSnapshot(snapshotPath); + } + } catch (FileNotFoundException fnf) { + // Source got deleted, we can ignore. + LOG.info("Failed to allow snapshot for {} since the path got deleted", snapshotPath); + } + return null; + }); + } catch (Exception e) { + throw new SnapshotException("Failed to AllowSnapshot on " + snapshotPath, e); + } + } + + /** + * Specifies the mode of copy when using Snapshots for replication. + */ + public enum SnapshotCopyMode { + INITIAL_COPY, // Copying for the first time, only one snapshot exists. + DIFF_COPY, // Copying when a pair of snapshots are present + FALLBACK_COPY // Symbolizes that normal copy needs to be used, That is without use of snapshots. + } + + /** + * Attempts creating a snapshot, with retries. + * @param fs FileSystem object. + * @param snapshotPath Path of the snapshot. + * @param snapshotName Name of the snapshot. + * @param conf Hive Configuration. + */ + public static void createSnapshot(FileSystem fs, Path snapshotPath, String snapshotName, HiveConf conf) + throws IOException { + Retryable retryable = Retryable.builder().withHiveConf(conf).withRetryOnException(IOException.class) + .withFailOnException(SnapshotException.class).build(); + try { + retryable.executeCallable(() -> { + try { + fs.createSnapshot(snapshotPath, snapshotName); + } catch (FileNotFoundException e) { + LOG.warn("Couldn't create the snapshot {} under path {}", snapshotName, snapshotPath, e); + } + return null; + }); + } catch (Exception e) { + throw new SnapshotException( + "Unable to create snapshot for path: " + snapshotPath + " snapshot name: " + snapshotName, e); + } + } + + /** + * Renames a snapshot with retries. + * @param fs the filesystem. + * @param snapshotPath the path where the snapshot lies. + * @param sourceSnapshotName current snapshot name. + * @param targetSnapshotName new snapshot name. + * @param conf configuration. + * @throws IOException in case of failure. + */ + public static void renameSnapshot(FileSystem fs, Path snapshotPath, String sourceSnapshotName, + String targetSnapshotName, HiveConf conf) throws IOException { + Retryable retryable = Retryable.builder().withHiveConf(conf).withRetryOnException(IOException.class) + .withFailOnException(SnapshotException.class).build(); + try { + retryable.executeCallable(() -> { + try { + fs.renameSnapshot(snapshotPath, sourceSnapshotName, targetSnapshotName); + } catch (FileNotFoundException e) { + LOG.warn("Couldn't rename the snapshot {} to {} under path {}", sourceSnapshotName, targetSnapshotName, + snapshotPath, e); + } + return null; + }); + } catch (Exception e) { + throw new SnapshotException( + "Unable to rename snapshot " + sourceSnapshotName + " to " + targetSnapshotName + " for path: " + + snapshotPath, e); + } + } + + /** + * Extracts the entries from FileList into an ArrayList + * @param fl the FileList + * @return the ArrayList containing the entries. + */ + public static ArrayList<String> getListFromFileList(FileList fl) { + ArrayList<String> result = new ArrayList<>(); + while (fl.hasNext()) { + result.add(fl.next()); + } + return result; + } + + /** + * Deletes the snapshots present in the list. + * @param dfs DistributedFileSystem. + * @param diffList Elements to be deleted. + * @param prefix Prefix used in snapshot names, + * @param snapshotCount snapshot counter to track the number of snapshots deleted. + * @throws IOException in case of any error. + */ + private static void cleanUpSnapshots(DistributedFileSystem dfs, ArrayList<String> diffList, String prefix, + ReplSnapshotCount snapshotCount) throws IOException { + for (String path : diffList) { + Path snapshotPath = new Path(path); + boolean isFirstDeleted = deleteSnapshotSafe(dfs, snapshotPath, firstSnapshot(prefix)); + boolean isSecondDeleted = deleteSnapshotSafe(dfs, snapshotPath, secondSnapshot(prefix)); + disallowSnapshot(dfs, snapshotPath); + if (snapshotCount != null) { + if (isFirstDeleted) { + snapshotCount.incrementNumDeleted(); + } + if (isSecondDeleted) { + snapshotCount.incrementNumDeleted(); + } + } + } + } + + private static ArrayList<String> getDiffList(ArrayList<String> newList, ArrayList<String> oldList, HiveConf conf, + boolean isLoad) throws SemanticException { + ArrayList<String> diffList = new ArrayList<>(); + for (String oldPath : oldList) { + if (!newList.contains(oldPath)) { + if (isLoad) { + diffList.add(externalTableDataPath(conf, getExternalTableBaseDir(conf), new Path(oldPath)).toString()); + } else { + diffList.add(oldPath); + } + diffList.add(oldPath); + } + } + return diffList; + } + + /** + * Cleans up any snapshots by computing diff between the list of snapshots between two replication dumps. + * @param dumpRoot Root of the dump + * @param snapshotPrefix prefix used by the snapshot + * @param conf Hive Configuration. + * @param snapshotCount the counter to store number of deleted snapshots. + * @param isLoad If this is called for clearing up snapshots at target cluster, In case of load it renames the + * snapshot file as well. + * @throws IOException + * @throws SemanticException + */ + public static void cleanupSnapshots(Path dumpRoot, String snapshotPrefix, HiveConf conf, + ReplSnapshotCount snapshotCount, boolean isLoad) throws IOException, SemanticException { + DistributedFileSystem dfs = (DistributedFileSystem) dumpRoot.getFileSystem(conf); + if (dfs.exists(new Path(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD))) { + FileList snapOld = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD, conf); + FileList snapNew = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT, conf); + ArrayList<String> oldPaths = SnapshotUtils.getListFromFileList(snapOld); + ArrayList<String> newPaths = SnapshotUtils.getListFromFileList(snapNew); + ArrayList<String> diffList = SnapshotUtils.getDiffList(newPaths, oldPaths, conf, isLoad); + dfs = isLoad ? (DistributedFileSystem) getExternalTableBaseDir(conf).getFileSystem(conf) : dfs; + SnapshotUtils.cleanUpSnapshots(dfs, diffList, snapshotPrefix, snapshotCount); + } + if (isLoad) { + try { + dfs.delete((new Path(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD)), true); + } catch (FileNotFoundException fnf) { + // ignore + LOG.warn("Failed to clean up snapshot " + EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD, fnf); + } + try { + dfs.rename(new Path(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT), + new Path(dumpRoot, EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_OLD), Options.Rename.OVERWRITE); + } catch (FileNotFoundException fnf) { + // ignore + LOG.warn("Failed to clean up snapshot " + EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT, fnf); + } + } + } + + public static boolean shouldRdiff(Path p, HiveConf conf, String snapshot) { + try { + DistributedFileSystem dfs = getDFS(p, conf); + // check if the target got modified + int index = + dfs.getClient().getSnapshotDiffReportListing(p.toUri().getPath(), snapshot, "", DFSUtilClient.EMPTY_BYTES, -1) + .getLastIndex(); + + return index > -1; + } catch (Exception e) { + return false; + } + } + + public static void deleteReplRelatedSnapshots(FileSystem fs, Path path) { + try { + FileStatus[] listing = fs.listStatus(new Path(path, ".snapshot")); + for (FileStatus elem : listing) { + if (elem.getPath().getName().contains(OLD_SNAPSHOT) || elem.getPath().getName().contains(NEW_SNAPSHOT)) { + deleteSnapshotSafe((DistributedFileSystem) fs, path, elem.getPath().getName()); Review comment: Changed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 592953) Time Spent: 4h 20m (was: 4h 10m) > Add support for Snapshots during external table replication > ----------------------------------------------------------- > > Key: HIVE-24852 > URL: https://issues.apache.org/jira/browse/HIVE-24852 > Project: Hive > Issue Type: Improvement > Reporter: Ayush Saxena > Assignee: Ayush Saxena > Priority: Critical > Labels: pull-request-available > Attachments: Design Doc HDFS Snapshots for External Table > Replication-01.pdf > > Time Spent: 4h 20m > Remaining Estimate: 0h > > Add support for use of snapshot diff for external table replication. -- This message was sent by Atlassian Jira (v8.3.4#803005)