[ 
https://issues.apache.org/jira/browse/HIVE-24918?focusedWorklogId=626319&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-626319
 ]

ASF GitHub Bot logged work on HIVE-24918:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Jul/21 20:43
            Start Date: 21/Jul/21 20:43
    Worklog Time Spent: 10m 
      Work Description: pkumarsinha commented on a change in pull request #2121:
URL: https://github.com/apache/hive/pull/2121#discussion_r674235129



##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -200,6 +203,233 @@ private void testTargetDbReplIncompatible(boolean 
setReplIncompProp) throws Thro
     }
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;

Review comment:
       Declare at line 212 only.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -552,6 +648,31 @@ private boolean isTableSatifiesConfig(Table table) {
     return true;
   }
 
+  private void fetchFailoverMetadata(Hive hiveDb) throws HiveException, 
TException {
+    FailoverMetaData fmd = new FailoverMetaData(
+            new Path(work.getCurrentDumpPath(), ReplUtils.REPL_HIVE_BASE_DIR), 
conf);
+    List<Long> txnsForDb = getOpenTxns(getTxnMgr().getValidTxns(excludedTxns), 
work.dbNameOrPattern);
+    if (!txnsForDb.isEmpty()) {
+      hiveDb.abortTransactions(txnsForDb);
+    }
+    fmd.setAbortedTxns(txnsForDb);
+    fmd.setCursorPoint(currentNotificationId(hiveDb));
+    ValidTxnList failoverTxns = getTxnMgr().getValidTxns(excludedTxns);

Review comment:
       failoverTxns --> allValidTxns ? 

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -552,6 +648,31 @@ private boolean isTableSatifiesConfig(Table table) {
     return true;
   }
 
+  private void fetchFailoverMetadata(Hive hiveDb) throws HiveException, 
TException {
+    FailoverMetaData fmd = new FailoverMetaData(
+            new Path(work.getCurrentDumpPath(), ReplUtils.REPL_HIVE_BASE_DIR), 
conf);
+    List<Long> txnsForDb = getOpenTxns(getTxnMgr().getValidTxns(excludedTxns), 
work.dbNameOrPattern);
+    if (!txnsForDb.isEmpty()) {
+      hiveDb.abortTransactions(txnsForDb);
+    }
+    fmd.setAbortedTxns(txnsForDb);
+    fmd.setCursorPoint(currentNotificationId(hiveDb));
+    ValidTxnList failoverTxns = getTxnMgr().getValidTxns(excludedTxns);
+    List<Long> openTxns = getOpenTxns(failoverTxns);
+    fmd.setOpenTxns(openTxns);
+    fmd.setTxnsWithoutLock(getTxnsNotPresentInHiveLocksTable(openTxns));
+    txnsForDb = getOpenTxns(failoverTxns, work.dbNameOrPattern);
+    if (!txnsForDb.isEmpty()) {
+      LOG.warn("Txns: " + txnsForDb + " initiated for database: "
+              + work.dbNameOrPattern + " while failover is in progress.");

Review comment:
       You are also aborting these, add that too in the log

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/FailoverMetaData.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FailoverMetaData {
+    public static final String FAILOVER_METADATA = "_failovermetadata";
+    private static final Logger LOG = 
LoggerFactory.getLogger(FailoverMetaData.class);
+
+    private static ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper();
+
+    @JsonProperty
+    private Long failoverEventId = null;
+    @JsonProperty
+    private Long cursorPoint = null;
+    @JsonProperty
+    private List<Long> abortedTxns;
+    @JsonProperty
+    private List<Long> openTxns;
+    @JsonProperty
+    private List<Long> txnsWithoutLock;
+
+    @JsonIgnore
+    private volatile boolean initialized = false;
+    @JsonIgnore
+    private final Path metadataFile;
+    @JsonIgnore
+    private final HiveConf hiveConf;
+
+    public FailoverMetaData() {
+        metadataFile = null;
+        hiveConf = null;
+    }
+
+    public FailoverMetaData(Path dumpDir, HiveConf hiveConf) {
+        this.hiveConf = hiveConf;
+        this.metadataFile = new Path(dumpDir, FAILOVER_METADATA);
+    }
+
+    private void initializeIfNot() throws SemanticException {
+        if (!initialized) {
+            loadMetadataFromFile();
+            initialized = true;
+        }
+    }
+
+    public void setMetaData(FailoverMetaData otherDMD) {
+        this.failoverEventId = otherDMD.failoverEventId;
+        this.abortedTxns = otherDMD.abortedTxns;
+        this.openTxns = otherDMD.openTxns;
+        this.cursorPoint = otherDMD.cursorPoint;
+        this.txnsWithoutLock = otherDMD.txnsWithoutLock;
+        this.initialized = true;
+    }
+
+    private synchronized void loadMetadataFromFile() throws SemanticException {

Review comment:
       When are we using it from multiple threads?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/FailoverMetaData.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FailoverMetaData {
+    public static final String FAILOVER_METADATA = "_failovermetadata";
+    private static final Logger LOG = 
LoggerFactory.getLogger(FailoverMetaData.class);
+
+    private static ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper();
+
+    @JsonProperty
+    private Long failoverEventId = null;
+    @JsonProperty
+    private Long cursorPoint = null;
+    @JsonProperty
+    private List<Long> abortedTxns;
+    @JsonProperty
+    private List<Long> openTxns;
+    @JsonProperty
+    private List<Long> txnsWithoutLock;
+
+    @JsonIgnore
+    private boolean initialized = false;
+    @JsonIgnore
+    private final Path dumpFile;
+    @JsonIgnore
+    private final HiveConf hiveConf;
+
+    public FailoverMetaData() {
+        //to be instantiated by JSON ObjectMapper.
+        dumpFile = null;
+        hiveConf = null;

Review comment:
       This is not addressed?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -242,6 +253,56 @@ public int execute() {
     return 0;
   }
 
+  private void rollbackFailover(Path failoverReadyMarker, Path 
failoverMetadataFile, Database db)
+          throws HiveException, IOException {
+    LOG.info("Rolling back failover initiated in previous dump iteration.");
+    FileSystem fs = failoverMetadataFile.getFileSystem(conf);
+    if (failoverMetadataFile != null) {

Review comment:
       When would this file failoverMetadataFile be null? Isn't that an error 
case?
   

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -173,22 +178,24 @@ public int execute() {
           return 
ErrorMsg.REPL_FAILED_WITH_NON_RECOVERABLE_ERROR.getErrorCode();
         }
         Path previousValidHiveDumpPath = 
getPreviousValidDumpMetadataPath(dumpRoot);
-        boolean isBootstrap = (previousValidHiveDumpPath == null);
-        work.setBootstrap(isBootstrap);
+        work.setBootstrap(previousValidHiveDumpPath == null);
         if (previousValidHiveDumpPath != null) {
           work.setOldReplScope(new DumpMetaData(previousValidHiveDumpPath, 
conf).getReplScope());
         }
-        //If no previous dump is present or previous dump is already loaded, 
proceed with the dump operation.
-        if (shouldDump(previousValidHiveDumpPath)) {
-          Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap);
+        boolean isPrevDumpFailoverReady = 
checkFailoverStatus(previousValidHiveDumpPath);

Review comment:
       Why does both checkFailoverStatus and getCurrentDumpPath has to handle 
rollback?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -268,11 +329,35 @@ private boolean shouldDumpAtlasMetadata() {
     return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA);
   }
 
-  private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws 
IOException {
+  private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws 
IOException, HiveException {
     Path lastDumpPath = ReplUtils.getLatestDumpPath(dumpRoot, conf);
     if (lastDumpPath != null && shouldResumePreviousDump(lastDumpPath, 
isBootstrap)) {
       //Resume previous dump
       LOG.info("Resuming the dump with existing dump directory {}", 
lastDumpPath);
+      FileSystem fs = lastDumpPath.getFileSystem(conf);
+      Path hiveDumpDir = new Path(lastDumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
+      Path failoverMetadataFile = new Path(hiveDumpDir, 
FailoverMetaData.FAILOVER_METADATA);
+      Path failoverReadyMarkerFile = new Path(hiveDumpDir, 
ReplAck.FAILOVER_READY_MARKER.toString());
+      if (fs.exists(failoverReadyMarkerFile)) {
+        //If failoverReadyMarkerFile exists, this means previous dump 
iteration failed while creating dump ACK file.
+        //So, just delete this file and proceed further.
+        LOG.info("Deleting failover ready marker file: {} created in previous 
dump iteration.", failoverReadyMarkerFile);
+        fs.delete(failoverReadyMarkerFile, true);
+      }
+      if (fs.exists(failoverMetadataFile)) {
+        //If failoverMetadata file exists, this means previous dump iteration 
failed after writing failover metadata info
+        //Now, if the failover start config is enabled, then just use the same 
metadata in current iteration also.
+        //    Else just rollback failover initiated in previous failed dump 
iteration.
+        if (conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_FAILOVER_START)) {
+          FailoverMetaData fmd = new FailoverMetaData(hiveDumpDir, conf);
+          if (fmd.isValidMetadata()) {

Review comment:
       What if not valid?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -242,6 +253,56 @@ public int execute() {
     return 0;
   }
 
+  private void rollbackFailover(Path failoverReadyMarker, Path 
failoverMetadataFile, Database db)
+          throws HiveException, IOException {
+    LOG.info("Rolling back failover initiated in previous dump iteration.");
+    FileSystem fs = failoverMetadataFile.getFileSystem(conf);
+    if (failoverMetadataFile != null) {
+      fs.delete(failoverMetadataFile, true);
+    }
+    if (failoverReadyMarker != null) {
+      fs.delete(failoverReadyMarker, true);
+    }
+    unsetReplFailoverEnabledIfSet(db);
+  }
+
+  private boolean checkFailoverStatus(Path previousValidHiveDumpPath) throws 
HiveException, IOException {

Review comment:
       Why is  checkFailoverStatus doing multiple things, especially changing 
the states and all, at least the name doesn't suggest that.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -552,6 +648,31 @@ private boolean isTableSatifiesConfig(Table table) {
     return true;
   }
 
+  private void fetchFailoverMetadata(Hive hiveDb) throws HiveException, 
TException {
+    FailoverMetaData fmd = new FailoverMetaData(
+            new Path(work.getCurrentDumpPath(), ReplUtils.REPL_HIVE_BASE_DIR), 
conf);
+    List<Long> txnsForDb = getOpenTxns(getTxnMgr().getValidTxns(excludedTxns), 
work.dbNameOrPattern);
+    if (!txnsForDb.isEmpty()) {
+      hiveDb.abortTransactions(txnsForDb);

Review comment:
       At least in debug mode a log statement is required

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -424,11 +518,13 @@ private boolean validDump(Path dumpDir) throws 
IOException {
     return false;
   }
 
-  private boolean shouldDump(Path previousDumpPath) throws IOException {
+  private boolean shouldDump(Path previousDumpPath, boolean 
isPrevDumpFailoverReady) throws IOException {
     //If no previous dump means bootstrap. So return true as there was no
     //previous dump to load
     if (previousDumpPath == null) {
       return true;
+    } else if (isPrevDumpFailoverReady) {
+      return false;

Review comment:
       What if in the previous run right before dump ack creation it failed. 
The failover ready marker will still be there and it must go for another dump 
no?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -268,11 +329,35 @@ private boolean shouldDumpAtlasMetadata() {
     return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_ATLAS_METADATA);
   }
 
-  private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws 
IOException {
+  private Path getCurrentDumpPath(Path dumpRoot, boolean isBootstrap) throws 
IOException, HiveException {
     Path lastDumpPath = ReplUtils.getLatestDumpPath(dumpRoot, conf);
     if (lastDumpPath != null && shouldResumePreviousDump(lastDumpPath, 
isBootstrap)) {
       //Resume previous dump
       LOG.info("Resuming the dump with existing dump directory {}", 
lastDumpPath);
+      FileSystem fs = lastDumpPath.getFileSystem(conf);
+      Path hiveDumpDir = new Path(lastDumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
+      Path failoverMetadataFile = new Path(hiveDumpDir, 
FailoverMetaData.FAILOVER_METADATA);
+      Path failoverReadyMarkerFile = new Path(hiveDumpDir, 
ReplAck.FAILOVER_READY_MARKER.toString());
+      if (fs.exists(failoverReadyMarkerFile)) {
+        //If failoverReadyMarkerFile exists, this means previous dump 
iteration failed while creating dump ACK file.
+        //So, just delete this file and proceed further.
+        LOG.info("Deleting failover ready marker file: {} created in previous 
dump iteration.", failoverReadyMarkerFile);
+        fs.delete(failoverReadyMarkerFile, true);
+      }
+      if (fs.exists(failoverMetadataFile)) {
+        //If failoverMetadata file exists, this means previous dump iteration 
failed after writing failover metadata info

Review comment:
       this means previous dump iteration failed  - It could also mean the user 
is going for rollback, no?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -320,12 +405,17 @@ private void addAtlasDumpTask(boolean bootstrap, Path 
prevHiveDumpDir) {
 
 
   private void finishRemainingTasks() throws SemanticException {
-    Path dumpAckFile = new Path(work.getCurrentDumpPath(),
-            ReplUtils.REPL_HIVE_BASE_DIR + File.separator
-                    + ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    boolean isFailoverInProgress = 
conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_FAILOVER_START) && 
!work.isBootstrap();
+    if (isFailoverInProgress) {
+      Utils.create(new Path(work.getCurrentDumpPath(), 
ReplUtils.REPL_HIVE_BASE_DIR + File.separator
+              + ReplAck.FAILOVER_READY_MARKER), conf);
+      LOG.info("Dump marked as failover ready");
+    }
+    Path dumpAckFile = new Path(work.getCurrentDumpPath(), 
ReplUtils.REPL_HIVE_BASE_DIR + File.separator
+                    + ReplAck.DUMP_ACKNOWLEDGEMENT);
     Utils.create(dumpAckFile, conf);
     prepareReturnValues(work.getResultValues());
-    work.getMetricCollector().reportEnd(Status.SUCCESS);
+    work.getMetricCollector().reportEnd(isFailoverInProgress ? 
Status.FAILOVER_READY : Status.SUCCESS);

Review comment:
       Status.FAILOVER_READY sounds like Status.SUCCESS + failover ready state. 
I think it should have been additional state field for 
FAILOVER_STATE:STARTED/COMPLETED/ABORTED? kind of 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 626319)
    Time Spent: 5.5h  (was: 5h 20m)

> Handle failover case during Repl Dump
> -------------------------------------
>
>                 Key: HIVE-24918
>                 URL: https://issues.apache.org/jira/browse/HIVE-24918
>             Project: Hive
>          Issue Type: New Feature
>            Reporter: Haymant Mangla
>            Assignee: Haymant Mangla
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> To handle:
>  a) Whenever user wants to go ahead with failover, during the next or 
> subsequent repl dump operation upon confirming that there are no pending open 
> transaction events, It should create a _failover_ready marker file in the 
> dump dir. This marker file would contain scheduled query name
> that has generated this dump.
> b) Skip next repl dump instances once we have the marker file placed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to