[ 
https://issues.apache.org/jira/browse/HIVE-23040?focusedWorklogId=425346&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-425346
 ]

ASF GitHub Bot logged work on HIVE-23040:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Apr/20 14:54
            Start Date: 20/Apr/20 14:54
    Worklog Time Spent: 10m 
      Work Description: pkumarsinha commented on a change in pull request #977:
URL: https://github.com/apache/hive/pull/977#discussion_r411444984



##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -663,6 +669,322 @@ public void testMultiDBTxn() throws Throwable {
     }
   }
 
+  @Test
+  public void testIncrementalDumpCheckpointing() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {});
+
+
+    //Case 1: When the last dump finished all the events and
+    //only  _finished_dump file at the hiveDumpRoot was about to be written 
when it failed.
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + 
primaryDbName)
+            .run("insert into t1 values (1)")
+            .run("insert into t2 values (2)")
+            .dump(primaryDbName);
+
+    Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackFile = new Path(hiveDumpDir, 
ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    Path ackLastEventID = new Path(hiveDumpDir, 
ReplAck.EVENTS_DUMP.toString());
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(ackLastEventID));
+
+    fs.delete(ackFile, false);
+
+    Map<String, Long> eventModTimeMap = new HashMap<>();
+    long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1;
+    long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId);
+    assertTrue(lastIncEventID > (firstIncEventID + 1));
+
+    for (long eventId=firstIncEventID; eventId<=lastIncEventID; eventId++) {
+      Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId));
+      if (fs.exists(eventRoot)) {
+        eventModTimeMap.put(String.valueOf(eventId), 
fs.getFileStatus(eventRoot).getModificationTime());
+      }
+    }
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+    WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName);
+    assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation);
+    assertTrue(fs.exists(ackFile));
+    //check events were not rewritten.
+    for(Map.Entry<String, Long> entry :eventModTimeMap.entrySet()) {
+      long oldModTime = entry.getValue();
+      long newModTime = fs.getFileStatus(new Path(hiveDumpDir, 
entry.getKey())).getModificationTime();
+      assertEquals(oldModTime, newModTime);
+    }
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"2"});
+
+
+    //Case 2: When the last dump was half way through
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump3 = primary.run("use " + 
primaryDbName)
+            .run("insert into t1 values (3)")
+            .run("insert into t2 values (4)")
+            .dump(primaryDbName);
+
+    hiveDumpDir = new Path(incrementalDump3.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString());
+    fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(ackLastEventID));
+
+    fs.delete(ackFile, false);
+    //delete last three events and test if it recovers.
+    long lastEventID = Long.parseLong(incrementalDump3.lastReplicationId);
+    Path lastEvtRoot = new Path(hiveDumpDir + File.separator + 
String.valueOf(lastEventID));
+    Path secondLastEvtRoot = new Path(hiveDumpDir + File.separator + 
String.valueOf(lastEventID - 1));
+    Path thirdLastEvtRoot = new Path(hiveDumpDir + File.separator + 
String.valueOf(lastEventID - 2));
+    assertTrue(fs.exists(lastEvtRoot));
+    assertTrue(fs.exists(secondLastEvtRoot));
+    assertTrue(fs.exists(thirdLastEvtRoot));
+
+    long lastEvtModTimeOld = 
fs.getFileStatus(lastEvtRoot).getModificationTime();
+    long secondLastEvtModTimeOld = 
fs.getFileStatus(secondLastEvtRoot).getModificationTime();
+    long thirdLastEvtModTimeOld = 
fs.getFileStatus(thirdLastEvtRoot).getModificationTime();
+
+    fs.delete(lastEvtRoot, true);
+    fs.delete(secondLastEvtRoot, true);
+    fs.delete(thirdLastEvtRoot, true);
+    List<List<String>> listValues = new ArrayList<>();
+    listValues.add(
+            Arrays.asList(
+                    LAST_EVENT_ID_NAME,
+                    String.valueOf(lastEventID - 3)
+            )
+    );
+    org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(listValues, 
ackLastEventID, primary.hiveConf, true);
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+
+    WarehouseInstance.Tuple incrementalDump4 = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName);
+
+    assertEquals(incrementalDump3.dumpLocation, incrementalDump4.dumpLocation);
+
+    verifyPathExist(fs, ackFile);
+    verifyPathExist(fs, ackLastEventID);
+    verifyPathExist(fs, lastEvtRoot);
+    verifyPathExist(fs, secondLastEvtRoot);
+    verifyPathExist(fs, thirdLastEvtRoot);
+    assertTrue(fs.getFileStatus(lastEvtRoot).getModificationTime() > 
lastEvtModTimeOld);
+    assertTrue(fs.getFileStatus(secondLastEvtRoot).getModificationTime() > 
secondLastEvtModTimeOld);
+    assertTrue(fs.getFileStatus(thirdLastEvtRoot).getModificationTime() > 
thirdLastEvtModTimeOld);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "3"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"2", "4"});
+  }
+
+  @Test
+  public void testCheckpointingOnFirstEventDump() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {});
+
+    // Testing a scenario where first event was getting dumped and it failed.
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + 
primaryDbName)
+            .run("insert into t1 values (1)")
+            .dump(primaryDbName);
+
+    Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackFile = new Path(hiveDumpDir, 
ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    Path ackLastEventID = new Path(hiveDumpDir, 
ReplAck.EVENTS_DUMP.toString());
+    Path dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata");
+
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(ackLastEventID));
+    assertTrue(fs.exists(dumpMetaData));
+
+    fs.delete(ackFile, false);
+    fs.delete(ackLastEventID, false);
+    fs.delete(dumpMetaData, false);
+    //delete all the event folder except first one.
+    long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1;
+    long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId);
+    assertTrue(lastIncEventID > (firstIncEventID + 1));
+
+    for (long eventId=firstIncEventID + 1; eventId<=lastIncEventID; eventId++) 
{
+      Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId));
+      if (fs.exists(eventRoot)) {
+        fs.delete(eventRoot, true);
+      }
+    }
+
+    Path firstIncEventRoot =  new Path(hiveDumpDir, 
String.valueOf(firstIncEventID));
+    long firstIncEventModTimeOld = 
fs.getFileStatus(firstIncEventRoot).getModificationTime();
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+
+    WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName);
+
+    assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation);
+    ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    assertTrue(fs.exists(ackFile));
+    long firstIncEventModTimeNew =  
fs.getFileStatus(firstIncEventRoot).getModificationTime();
+    assertTrue(firstIncEventModTimeNew > firstIncEventModTimeOld);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1"});
+  }
+
+
+  @Test
+  public void testCheckPointingDataDumpFailureBootstrapDuringIncremental() 
throws Throwable {
+    List<String> dumpClause = Arrays.asList(
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + 
"'='1'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + 
"'='0'",
+            "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+                    + UserGroupInformation.getCurrentUser().getUserName() + 
"'");
+
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("create table t1(a int) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("insert into t1 values (1)")
+            .run("insert into t1 values (2)")
+            .dump(primaryDbName, dumpClause);
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "2"});
+
+    dumpClause = Arrays.asList(
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + 
"'='1'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + 
"'='0'",
+            "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+                    + UserGroupInformation.getCurrentUser().getUserName() + 
"'",
+            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'");
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + 
primaryDbName)
+            .run("create table t2(a int) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("insert into t2 values (3)")
+            .run("insert into t2 values (4)")
+            .run("insert into t2 values (5)")
+            .dump(primaryDbName, dumpClause);
+
+    Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackFile = new Path(hiveDumpDir, 
ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    Path ackLastEventID = new Path(hiveDumpDir, 
ReplAck.EVENTS_DUMP.toString());
+    Path bootstrapDir = new Path(hiveDumpDir, 
ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
+    Path metaDir = new Path(bootstrapDir, EximUtil.METADATA_PATH_NAME);
+
+    Path t2dataDir = new Path(bootstrapDir, EximUtil.DATA_PATH_NAME + 
File.separator
+            + primaryDbName + File.separator + "t2");
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+
+    verifyPathExist(fs, ackFile);
+    verifyPathExist(fs, ackLastEventID);
+
+    long oldMetadirModTime = fs.getFileStatus(metaDir).getModificationTime();
+    long oldT2DatadirModTime = 
fs.getFileStatus(t2dataDir).getModificationTime();
+
+    fs.delete(ackFile, false);
+
+    //Do another dump and test the rewrite happended for meta and no write for 
data folder
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+    WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName, dumpClause);
+    assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation);
+    assertTrue(fs.exists(ackFile));
+    verifyPathExist(fs, ackFile);
+    verifyPathExist(fs, ackLastEventID);
+
+    long newMetadirModTime = fs.getFileStatus(metaDir).getModificationTime();
+    long newT2DatadirModTime = 
fs.getFileStatus(t2dataDir).getModificationTime();
+
+    assertTrue(newMetadirModTime > oldMetadirModTime);
+    assertEquals(oldT2DatadirModTime, newT2DatadirModTime);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"3", "4", "5"});
+  }
+
+  @Test
+  public void testHdfsMaxDirItemsLimitDuringIncremental() throws Throwable {
+
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("create table t1(a int) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("insert into t1 values (1)")
+            .dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1"});
+
+    List<String> dumpClause = Arrays.asList("'" + 
ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG + "'='"
+            + (ReplUtils.RESERVED_DIR_ITEMS_COUNT + 5) +"'",
+            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'");
+
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + 
primaryDbName)
+            .run("insert into t1 values (2)")
+            .run("insert into t1 values (3)")
+            .run("insert into t1 values (4)")
+            .run("insert into t1 values (5)")
+            .run("insert into t1 values (6)")
+            .run("insert into t1 values (7)")
+            .run("insert into t1 values (8)")
+            .run("insert into t1 values (9)")
+            .run("insert into t1 values (10)")
+            .run("create table t2(a int) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("insert into t2 values (100)")
+            .dump(primaryDbName, dumpClause);
+
+    int eventCount = Integer.parseInt(incrementalDump1.lastReplicationId)
+            - Integer.parseInt(bootstrapDump.lastReplicationId);
+    assertEquals(eventCount, 5);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"100"});
+
+    dumpClause = Arrays.asList("'" + ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG + 
"'='1000'");
+
+    WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName, dumpClause);
+
+    eventCount = Integer.parseInt(incrementalDump1.lastReplicationId)

Review comment:
       Actually, this is redundant check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 425346)
    Time Spent: 50m  (was: 40m)

> Checkpointing for repl dump incremental phase
> ---------------------------------------------
>
>                 Key: HIVE-23040
>                 URL: https://issues.apache.org/jira/browse/HIVE-23040
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Aasha Medhi
>            Assignee: PRAVIN KUMAR SINHA
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-23040.01.patch, HIVE-23040.02.patch
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to