[ 
https://issues.apache.org/jira/browse/HIVE-24918?focusedWorklogId=627042&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-627042
 ]

ASF GitHub Bot logged work on HIVE-24918:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Jul/21 07:34
            Start Date: 23/Jul/21 07:34
    Worklog Time Spent: 10m 
      Work Description: pkumarsinha commented on a change in pull request #2121:
URL: https://github.com/apache/hive/pull/2121#discussion_r675326231



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -236,12 +251,55 @@ public int execute() {
       }
       catch (Exception ex){
         LOG.error("Failed to collect replication metrics: ", ex);
-        return errorCode;        
+        return errorCode;
       }
     }
     return 0;
   }
 
+  private void preProcessFailoverIfRequired(Path previousValidHiveDumpDir, 
Path currentHiveDumpDir,
+                                            boolean isPrevDumpFailoverReady) 
throws HiveException, IOException {
+    FileSystem fs = currentHiveDumpDir.getFileSystem(conf);
+    boolean shouldFailover = shouldFailover();
+    Database db = getHive().getDatabase(work.dbNameOrPattern);
+    if (isPrevDumpFailoverReady) {
+      boolean isDbFailedOver = MetaStoreUtils.isDbBeingFailedOver(db);
+      if (isDbFailedOver) {
+        //Since previous valid dump is failover ready and 
repl.failover.enabled is set for database, just rollback
+        // the failover process initiated in the previous iteration.
+        LOG.info("Rolling back failover initiated in previous dump 
iteration.");
+        fs.delete(new Path(previousValidHiveDumpDir, 
ReplAck.FAILOVER_READY_MARKER.toString()), true);
+      } else {
+        //Since previous valid dump is failover ready and 
repl.failover.enabled is not set for database, this means
+        //this is first dump operation in the reverse direction.
+        LOG.info("Switching to bootstrap dump as this is the first dump 
execution after failover.");
+        work.setFirstDumpAfterFailover(true);
+      }
+    } else if (work.shouldOverWrite()) {
+      //shouldOverWrite is set when previous failed dump iteration is resumed.
+      Path failoverMetadataFile = new Path(currentHiveDumpDir, 
FailoverMetaData.FAILOVER_METADATA);
+      Path failoverReadyMarkerFile = new Path(currentHiveDumpDir, 
ReplAck.FAILOVER_READY_MARKER.toString());
+      if (fs.exists(failoverReadyMarkerFile)) {
+        //If failoverReadyMarker exists, this means previous dump failed while 
creating dump ACK. Just delete and proceed.
+        LOG.info("Deleting failover ready marker file: {}.", 
failoverReadyMarkerFile);
+        fs.delete(failoverReadyMarkerFile, true);
+      }
+      if (fs.exists(failoverMetadataFile) && !shouldFailover) {
+        LOG.info("Rolling back failover initiated in previous dump 
iteration.");
+        fs.delete(failoverMetadataFile, true);
+      }
+    }
+    if (!shouldFailover) {
+      unsetReplFailoverEnabledIfSet(db);
+    }
+  }
+
+  private boolean isDumpFailoverReady(Path previousValidHiveDumpPath) throws 
HiveException, IOException {

Review comment:
       Does not throw HiveException. Remove it.

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -200,6 +203,232 @@ private void testTargetDbReplIncompatible(boolean 
setReplIncompProp) throws Thro
     }
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    List<String> failoverConfigs = Arrays.asList("'" + 
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    WarehouseInstance.Tuple dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets 
stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) 
tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication 
only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, 
ReplAck.FAILOVER_READY_MARKER.toString())));
+    
assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**Open transactions can be of two types:
+     Case 1) Txns that have not acquired HIVE LOCKS or they belong to 
different db: These txns would be captured in
+     _failovermetadata file inside dump directory.
+     Case 2) Txns that have acquired HIVE LOCKS and belong to db under 
replication: These txns would be aborted by hive
+     as part of dump operation.
+     */
+    // Open 3 txns for Database which is not under replication
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, 
primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = 
allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, 
primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = 
allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, 
FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, 
ReplAck.FAILOVER_READY_MARKER.toString())));
+    
assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLock().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump 
operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsAborted(txnsWithNoLocks, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, 
ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    Path dbRootDir = new Path(dumpData.dumpLocation).getParent();
+    long prevDumpDirModifTime = getLatestDumpDirModifTime(dbRootDir);
+    primary.run("REPL DUMP " + primaryDbName + " with ('" + 
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "' = 'true')");
+    Assert.assertEquals(dumpData.dumpLocation, 
ReplUtils.getLatestDumpPath(dbRootDir, conf).toString());
+    Assert.assertEquals(prevDumpDirModifTime, 
getLatestDumpDirModifTime(dbRootDir));
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, 
ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    
assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    Assert.assertEquals(new DumpMetaData(dumpPath, conf).getDumpType(), 
DumpType.INCREMENTAL);
+    Path failoverReadyFile = new Path(dumpPath, 
ReplAck.FAILOVER_READY_MARKER.toString());
+    Path failoverMdFile = new Path(dumpPath, 
FailoverMetaData.FAILOVER_METADATA);
+    assertFalse(fs.exists(failoverReadyFile));
+    assertFalse(fs.exists(failoverMdFile));
+    
assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    replica.load(replicatedDbName, primaryDbName);
+
+    fs.create(failoverReadyFile);
+    fs.create(failoverMdFile);
+    assertTrue(fs.exists(failoverReadyFile));
+    assertTrue(fs.exists(failoverMdFile));
+
+    //Since the failover start config is disabled and previous valid dump 
directory contains _failover_ready marker file
+    //So, this dump iteration will perform bootstrap dump instead of 
incremental and last dump directory also should not
+    //deleted.
+    WarehouseInstance.Tuple newDumpData = primary.dump(primaryDbName);
+    assertNotEquals(newDumpData.dumpLocation, dumpData.dumpLocation);
+    assertTrue(fs.exists(dumpPath));
+    assertTrue(fs.exists(new Path(dumpPath, 
ReplAck.FAILOVER_READY_MARKER.toString())));
+    dumpPath = new Path(newDumpData.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, 
FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == 
DumpType.BOOTSTRAP);
+  }
+
+  private long getLatestDumpDirModifTime(Path dumpRoot) throws Exception {
+    FileSystem fs = dumpRoot.getFileSystem(conf);
+    long latestModifTime = -1;
+    if (fs.exists(dumpRoot)) {
+      for (FileStatus status : fs.listStatus(dumpRoot)) {
+        if (status.getModificationTime() > latestModifTime) {
+          latestModifTime = status.getModificationTime();
+        }
+      }
+    }
+    return latestModifTime;
+  }
+
+  @Test
+  public void testFailoverDuringDumpWithPreviousFailed() throws Throwable {
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + 
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets 
stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) 
tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication 
only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, 
FailoverMetaData.FAILOVER_METADATA)));
+    assertFalse(fs.exists(new Path(dumpPath, 
ReplAck.FAILOVER_READY_MARKER.toString())));
+    
assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    dumpData = primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .dump(primaryDbName, failoverConfigs);
+
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    Path dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString());
+    assertTrue(fs.exists(dumpAckFile));
+    assertTrue(fs.exists(new Path(dumpPath, 
FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, 
ReplAck.FAILOVER_READY_MARKER.toString())));
+    
assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData previousFmd = new FailoverMetaData(dumpPath, conf);
+    Long failoverEventId = previousFmd.getFailoverEventId();
+    assertTrue(failoverEventId >= Long.parseLong(dumpData.lastReplicationId));
+
+    fs.delete(dumpAckFile, false);
+
+    dumpData = primary.run("use " + primaryDbName)
+            .run("insert into t2 partition(name='Carl') values(10)")
+            .dump(primaryDbName, failoverConfigs);
+
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, 
FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, 
ReplAck.FAILOVER_READY_MARKER.toString())));
+    
assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));

Review comment:
       Add modTime check for failover_metadata file.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -424,11 +491,13 @@ private boolean validDump(Path dumpDir) throws 
IOException {
     return false;
   }
 
-  private boolean shouldDump(Path previousDumpPath) throws IOException {
+  private boolean shouldDump(Path previousDumpPath, boolean 
isFailoverMarkerPresent) throws IOException {
     //If no previous dump means bootstrap. So return true as there was no

Review comment:
       Can you add some comment here? like:
   /** a) If there is no previous dump dir found, the current run is bootstrap 
case.
        *  b) If the previous dump was successful and it contains failover 
marker file as well as
        *  HiveConf.ConfVars.HIVE_REPL_FAILOVER_START == true, last dump was a 
controlled failover dump,
        *  skip doing any further dump.
        */

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -173,22 +177,29 @@ public int execute() {
           return 
ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode();
         }
         Path previousValidHiveDumpPath = 
getPreviousValidDumpMetadataPath(dumpRoot);
-        boolean isBootstrap = (previousValidHiveDumpPath == null);
-        work.setBootstrap(isBootstrap);
-        if (previousValidHiveDumpPath != null) {
+        boolean isPrevDumpFailoverReady = false;

Review comment:
       Rename to isFailoverMarkerPresent

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -236,12 +251,55 @@ public int execute() {
       }
       catch (Exception ex){
         LOG.error("Failed to collect replication metrics: ", ex);
-        return errorCode;        
+        return errorCode;
       }
     }
     return 0;
   }
 
+  private void preProcessFailoverIfRequired(Path previousValidHiveDumpDir, 
Path currentHiveDumpDir,
+                                            boolean isPrevDumpFailoverReady) 
throws HiveException, IOException {
+    FileSystem fs = currentHiveDumpDir.getFileSystem(conf);
+    boolean shouldFailover = shouldFailover();
+    Database db = getHive().getDatabase(work.dbNameOrPattern);
+    if (isPrevDumpFailoverReady) {
+      boolean isDbFailedOver = MetaStoreUtils.isDbBeingFailedOver(db);
+      if (isDbFailedOver) {

Review comment:
       Use MetaStoreUtils.isDbBeingFailedOver(db)

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -236,12 +251,55 @@ public int execute() {
       }
       catch (Exception ex){
         LOG.error("Failed to collect replication metrics: ", ex);
-        return errorCode;        
+        return errorCode;
       }
     }
     return 0;
   }
 
+  private void preProcessFailoverIfRequired(Path previousValidHiveDumpDir, 
Path currentHiveDumpDir,
+                                            boolean isPrevDumpFailoverReady) 
throws HiveException, IOException {
+    FileSystem fs = currentHiveDumpDir.getFileSystem(conf);
+    boolean shouldFailover = shouldFailover();
+    Database db = getHive().getDatabase(work.dbNameOrPattern);
+    if (isPrevDumpFailoverReady) {
+      boolean isDbFailedOver = MetaStoreUtils.isDbBeingFailedOver(db);
+      if (isDbFailedOver) {
+        //Since previous valid dump is failover ready and 
repl.failover.enabled is set for database, just rollback
+        // the failover process initiated in the previous iteration.
+        LOG.info("Rolling back failover initiated in previous dump 
iteration.");
+        fs.delete(new Path(previousValidHiveDumpDir, 
ReplAck.FAILOVER_READY_MARKER.toString()), true);
+      } else {
+        //Since previous valid dump is failover ready and 
repl.failover.enabled is not set for database, this means
+        //this is first dump operation in the reverse direction.
+        LOG.info("Switching to bootstrap dump as this is the first dump 
execution after failover.");
+        work.setFirstDumpAfterFailover(true);
+      }
+    } else if (work.shouldOverWrite()) {
+      //shouldOverWrite is set when previous failed dump iteration is resumed.
+      Path failoverMetadataFile = new Path(currentHiveDumpDir, 
FailoverMetaData.FAILOVER_METADATA);
+      Path failoverReadyMarkerFile = new Path(currentHiveDumpDir, 
ReplAck.FAILOVER_READY_MARKER.toString());
+      if (fs.exists(failoverReadyMarkerFile)) {
+        //If failoverReadyMarker exists, this means previous dump failed while 
creating dump ACK. Just delete and proceed.
+        LOG.info("Deleting failover ready marker file: {}.", 
failoverReadyMarkerFile);
+        fs.delete(failoverReadyMarkerFile, true);
+      }
+      if (fs.exists(failoverMetadataFile) && !shouldFailover) {

Review comment:
       Not needed. It can help debug should the need arise




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 627042)
    Time Spent: 7h 40m  (was: 7.5h)

> Handle failover case during Repl Dump
> -------------------------------------
>
>                 Key: HIVE-24918
>                 URL: https://issues.apache.org/jira/browse/HIVE-24918
>             Project: Hive
>          Issue Type: New Feature
>            Reporter: Haymant Mangla
>            Assignee: Haymant Mangla
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> To handle:
>  a) Whenever user wants to go ahead with failover, during the next or 
> subsequent repl dump operation upon confirming that there are no pending open 
> transaction events, It should create a _failover_ready marker file in the 
> dump dir. This marker file would contain scheduled query name
> that has generated this dump.
> b) Skip next repl dump instances once we have the marker file placed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to