[ 
https://issues.apache.org/jira/browse/HIVE-25708?focusedWorklogId=694851&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-694851
 ]

ASF GitHub Bot logged work on HIVE-25708:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Dec/21 05:55
            Start Date: 13/Dec/21 05:55
    Worklog Time Spent: 10m 
      Work Description: ayushtkn commented on a change in pull request #2795:
URL: https://github.com/apache/hive/pull/2795#discussion_r767429781



##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import 
org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSpec.getLastReplicatedStateFromParameters;
+
+public class OptimisedBootstrapUtils {
+
+  public static final String FILE_ENTRY_SEPERATOR = "#";
+  private static Logger LOG = 
LoggerFactory.getLogger(OptimisedBootstrapUtils.class);
+
+  /** table diff directory when in progress */
+  public static final String TABLE_DIFF_INPROGRESS_DIRECTORY = "table_diff";
+
+  /** table diff directory when complete */
+  public static final String TABLE_DIFF_COMPLETE_DIRECTORY = 
"table_diff_complete";
+
+  /** event ack file which contains the event id till which the cluster was 
last loaded. */
+  public static final String EVENT_ACK_FILE = "event_ack";
+
+  /**
+   * Gets & checks whether the database is target of replication.
+   * @param dbName name of database
+   * @param hive hive object
+   * @return true, if the database has repl.target.for property set.
+   * @throws HiveException
+   */
+  public static boolean isFailover(String dbName, Hive hive) throws 
HiveException {
+    Database database = hive.getDatabase(dbName);
+    return database != null ? MetaStoreUtils.isTargetOfReplication(database) : 
false;
+  }
+
+  public static boolean checkFileExists(Path dumpPath, HiveConf conf, String 
fileName) throws IOException {
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    return fs.exists(new Path(dumpPath, fileName));
+  }
+
+  /**
+   * Gets the event id from the event ack file
+   * @param dumpPath the dump path
+   * @param conf the hive configuration
+   * @return the event id from file.
+   * @throws IOException
+   */
+  public static String getEventIdFromFile(Path dumpPath, HiveConf conf) throws 
IOException {
+    String lastEventId;
+    Path eventAckFilePath = new Path(dumpPath, EVENT_ACK_FILE);
+    FileSystem fs = eventAckFilePath.getFileSystem(conf);
+    try (FSDataInputStream stream = fs.open(eventAckFilePath);) {
+      lastEventId = IOUtils.toString(stream, Charset.defaultCharset());
+    }
+    return lastEventId.replaceAll(System.lineSeparator(),"").trim();
+  }
+
+  /**
+   * Gets the name of tables in the table diff file.
+   * @param dumpPath the dump path
+   * @param conf the hive configuration
+   * @return Set with list of tables
+   * @throws Exception
+   */
+  public static HashSet<String> getTablesFromTableDiffFile(Path dumpPath, 
HiveConf conf) throws Exception {
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    Path tableDiffPath = new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY);
+    FileStatus[] list = fs.listStatus(tableDiffPath);
+    HashSet<String> tables = new HashSet<>();

Review comment:
       In general it shouldn't happen. But I don't want duplicates either, So , 
want to design it that way, that it can't have a duplicate.

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import 
org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSpec.getLastReplicatedStateFromParameters;
+
+public class OptimisedBootstrapUtils {

Review comment:
       Done

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, 
"true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, 
UserGroupInformation.getCurrentUser().getUserName());
+    
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, 
TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country 
string)")
+        .run("insert into table t2 partition(country='india') values 
('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new 
york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country 
string)")
+        .run("insert into table t2_managed partition(country='india') values 
('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values 
('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are 
present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an 
empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  
some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values 
('lyon')")
+        .run("insert into table t2_managed partition(country='france') values 
('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, 
that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, 
withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), 
afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped 
and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new 
Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", 
"t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", 
"t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), 
afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", new Path(tuple.dumpLocation), 
conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", new Path(tuple.dumpLocation), 
conf).isEmpty());

Review comment:
       Done

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, 
"true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, 
UserGroupInformation.getCurrentUser().getUserName());
+    
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, 
TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country 
string)")
+        .run("insert into table t2 partition(country='india') values 
('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new 
york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country 
string)")
+        .run("insert into table t2_managed partition(country='india') values 
('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values 
('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are 
present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an 
empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  
some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values 
('lyon')")
+        .run("insert into table t2_managed partition(country='france') values 
('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, 
that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, 
withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), 
afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped 
and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new 
Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", 
"t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", 
"t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), 
afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", new Path(tuple.dumpLocation), 
conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", new Path(tuple.dumpLocation), 
conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t4_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+
+    // Check the dropped and empty tables.
+    assertTrue(getPathsFromTableFile("t5_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+    assertTrue(getPathsFromTableFile("t6_managed", new 
Path(tuple.dumpLocation), conf).size() == 1);
+  }
+
+  @Test
+  public void testEmptyDiffForControlFailover() throws Throwable {
+
+    // In case of control failover both A & B will be in sync, so the table 
diff should be created empty, without any
+    // error.
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some tables & do a incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (100),(200)")
+        .run("insert into table t1 values (12),(35),(46)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (120)")
+        .run("insert into table t1_managed values (10),(321),(423)")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and see all the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed");
+
+    // Trigger reverse cycle. Do dump on target cluster.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "rev");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Even though no diff, the event ack file should be created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a reverse load.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory still gets created.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff is empty, since we are in sync, so no tables got 
modified.
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new 
Path(tuple.dumpLocation), conf);
+    assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, 
tableDiffEntries.size());
+  }
+
+  @Test
+  public void testFirstIncrementalMandatory() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    // Create one external and one managed tables and do a bootstrap dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .dump(primaryDbName, withClause);
+
+    // Do a bootstrap load and check both managed and external tables are 
loaded.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Trigger reverse dump just after the bootstrap cycle.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a dump on cluster B, it should throw an exception, since the first 
incremental isn't done yet.
+    try {
+      replica.dump(replicatedDbName, withClause);
+    } catch (HiveException he) {
+      assertTrue(he.getMessage()
+          .contains("Replication dump not allowed for replicated database with 
first incremental dump pending "));
+    }
+  }
+
+  @Test
+  public void testFailureCasesInTableDiffGeneration() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do an incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('A')")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and check the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on the source cluster, so we have some entries in 
the table diff.
+    primary.run("use " + primaryDbName)
+        .run("create table t2_managed (id string)")
+        .run("insert into table t1_managed values ('S')")
+        .run("insert into table t2_managed values ('A'),('B'),('C')");
+
+    // Do some modifications in another database to have unrelated events as 
well after the last load, which should
+    // get filtered.
+
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (15),(1),(96)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('SA'),('PS')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "reverse");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Trigger dump on target cluster.
+
+    replicaFs.setQuota(newReplDir, 1, 10000);
+    try {
+      tuple = replica.dump(replicatedDbName, withClause);
+      fail("Should have failed due to quota violation");
+    } catch (Exception e) {
+      // Ignore it is expected due to Quota violation.
+    }
+

Review comment:
       Done

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, 
"true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, 
UserGroupInformation.getCurrentUser().getUserName());
+    
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, 
TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country 
string)")
+        .run("insert into table t2 partition(country='india') values 
('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new 
york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country 
string)")
+        .run("insert into table t2_managed partition(country='india') values 
('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values 
('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are 
present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an 
empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  
some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values 
('lyon')")
+        .run("insert into table t2_managed partition(country='france') values 
('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, 
that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, 
withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), 
afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped 
and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new 
Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", 
"t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", 
"t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), 
afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", new Path(tuple.dumpLocation), 
conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", new Path(tuple.dumpLocation), 
conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t4_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+
+    // Check the dropped and empty tables.
+    assertTrue(getPathsFromTableFile("t5_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+    assertTrue(getPathsFromTableFile("t6_managed", new 
Path(tuple.dumpLocation), conf).size() == 1);
+  }
+
+  @Test
+  public void testEmptyDiffForControlFailover() throws Throwable {
+
+    // In case of control failover both A & B will be in sync, so the table 
diff should be created empty, without any
+    // error.
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some tables & do a incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (100),(200)")
+        .run("insert into table t1 values (12),(35),(46)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (120)")
+        .run("insert into table t1_managed values (10),(321),(423)")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and see all the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed");
+
+    // Trigger reverse cycle. Do dump on target cluster.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "rev");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Even though no diff, the event ack file should be created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a reverse load.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory still gets created.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff is empty, since we are in sync, so no tables got 
modified.
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new 
Path(tuple.dumpLocation), conf);
+    assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, 
tableDiffEntries.size());
+  }
+
+  @Test
+  public void testFirstIncrementalMandatory() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    // Create one external and one managed tables and do a bootstrap dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .dump(primaryDbName, withClause);
+
+    // Do a bootstrap load and check both managed and external tables are 
loaded.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Trigger reverse dump just after the bootstrap cycle.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a dump on cluster B, it should throw an exception, since the first 
incremental isn't done yet.
+    try {
+      replica.dump(replicatedDbName, withClause);
+    } catch (HiveException he) {
+      assertTrue(he.getMessage()
+          .contains("Replication dump not allowed for replicated database with 
first incremental dump pending "));

Review comment:
       Done

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, 
"true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, 
UserGroupInformation.getCurrentUser().getUserName());
+    
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, 
TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country 
string)")
+        .run("insert into table t2 partition(country='india') values 
('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new 
york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country 
string)")
+        .run("insert into table t2_managed partition(country='india') values 
('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values 
('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are 
present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an 
empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  
some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values 
('lyon')")
+        .run("insert into table t2_managed partition(country='france') values 
('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, 
that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, 
withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), 
afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped 
and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new 
Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", 
"t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", 
"t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), 
afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", new Path(tuple.dumpLocation), 
conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", new Path(tuple.dumpLocation), 
conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t4_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+
+    // Check the dropped and empty tables.
+    assertTrue(getPathsFromTableFile("t5_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+    assertTrue(getPathsFromTableFile("t6_managed", new 
Path(tuple.dumpLocation), conf).size() == 1);
+  }
+
+  @Test
+  public void testEmptyDiffForControlFailover() throws Throwable {
+
+    // In case of control failover both A & B will be in sync, so the table 
diff should be created empty, without any
+    // error.
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some tables & do a incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (100),(200)")
+        .run("insert into table t1 values (12),(35),(46)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (120)")
+        .run("insert into table t1_managed values (10),(321),(423)")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and see all the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed");
+
+    // Trigger reverse cycle. Do dump on target cluster.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "rev");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Even though no diff, the event ack file should be created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a reverse load.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory still gets created.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff is empty, since we are in sync, so no tables got 
modified.
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new 
Path(tuple.dumpLocation), conf);
+    assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, 
tableDiffEntries.size());
+  }
+
+  @Test
+  public void testFirstIncrementalMandatory() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    // Create one external and one managed tables and do a bootstrap dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .dump(primaryDbName, withClause);
+
+    // Do a bootstrap load and check both managed and external tables are 
loaded.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Trigger reverse dump just after the bootstrap cycle.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a dump on cluster B, it should throw an exception, since the first 
incremental isn't done yet.
+    try {
+      replica.dump(replicatedDbName, withClause);
+    } catch (HiveException he) {
+      assertTrue(he.getMessage()
+          .contains("Replication dump not allowed for replicated database with 
first incremental dump pending "));
+    }
+  }
+
+  @Test
+  public void testFailureCasesInTableDiffGeneration() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do an incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('A')")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and check the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on the source cluster, so we have some entries in 
the table diff.
+    primary.run("use " + primaryDbName)
+        .run("create table t2_managed (id string)")
+        .run("insert into table t1_managed values ('S')")
+        .run("insert into table t2_managed values ('A'),('B'),('C')");
+
+    // Do some modifications in another database to have unrelated events as 
well after the last load, which should
+    // get filtered.
+
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (15),(1),(96)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('SA'),('PS')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "reverse");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Trigger dump on target cluster.
+
+    replicaFs.setQuota(newReplDir, 1, 10000);
+    try {
+      tuple = replica.dump(replicatedDbName, withClause);
+      fail("Should have failed due to quota violation");
+    } catch (Exception e) {
+      // Ignore it is expected due to Quota violation.
+    }

Review comment:
       Done

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, 
"true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, 
UserGroupInformation.getCurrentUser().getUserName());
+    
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, 
TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country 
string)")
+        .run("insert into table t2 partition(country='india') values 
('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new 
york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country 
string)")
+        .run("insert into table t2_managed partition(country='india') values 
('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values 
('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are 
present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an 
empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  
some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values 
('lyon')")
+        .run("insert into table t2_managed partition(country='france') values 
('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, 
that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, 
withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), 
afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped 
and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new 
Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", 
"t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", 
"t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), 
afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", new Path(tuple.dumpLocation), 
conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", new Path(tuple.dumpLocation), 
conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t4_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+
+    // Check the dropped and empty tables.
+    assertTrue(getPathsFromTableFile("t5_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+    assertTrue(getPathsFromTableFile("t6_managed", new 
Path(tuple.dumpLocation), conf).size() == 1);
+  }
+
+  @Test
+  public void testEmptyDiffForControlFailover() throws Throwable {
+
+    // In case of control failover both A & B will be in sync, so the table 
diff should be created empty, without any
+    // error.
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some tables & do a incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (100),(200)")
+        .run("insert into table t1 values (12),(35),(46)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (120)")
+        .run("insert into table t1_managed values (10),(321),(423)")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and see all the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed");
+
+    // Trigger reverse cycle. Do dump on target cluster.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "rev");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Even though no diff, the event ack file should be created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a reverse load.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory still gets created.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff is empty, since we are in sync, so no tables got 
modified.
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new 
Path(tuple.dumpLocation), conf);
+    assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, 
tableDiffEntries.size());
+  }
+
+  @Test
+  public void testFirstIncrementalMandatory() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    // Create one external and one managed tables and do a bootstrap dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .dump(primaryDbName, withClause);
+
+    // Do a bootstrap load and check both managed and external tables are 
loaded.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Trigger reverse dump just after the bootstrap cycle.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a dump on cluster B, it should throw an exception, since the first 
incremental isn't done yet.
+    try {
+      replica.dump(replicatedDbName, withClause);
+    } catch (HiveException he) {
+      assertTrue(he.getMessage()
+          .contains("Replication dump not allowed for replicated database with 
first incremental dump pending "));
+    }
+  }
+
+  @Test
+  public void testFailureCasesInTableDiffGeneration() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some table & do an incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('A')")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and check the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on the source cluster, so we have some entries in 
the table diff.
+    primary.run("use " + primaryDbName)
+        .run("create table t2_managed (id string)")
+        .run("insert into table t1_managed values ('S')")
+        .run("insert into table t2_managed values ('A'),('B'),('C')");
+
+    // Do some modifications in another database to have unrelated events as 
well after the last load, which should
+    // get filtered.
+
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (15),(1),(96)")
+        .run("create  table t1_managed (id string)")
+        .run("insert into table t1_managed values ('SA'),('PS')");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "reverse");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Trigger dump on target cluster.
+
+    replicaFs.setQuota(newReplDir, 1, 10000);
+    try {
+      tuple = replica.dump(replicatedDbName, withClause);
+      fail("Should have failed due to quota violation");
+    } catch (Exception e) {
+      // Ignore it is expected due to Quota violation.
+    }
+
+    // Remove the quota, i.e solve the reason for failure and retry dump.
+    replicaFs.setQuota(newReplDir, QUOTA_RESET, QUOTA_RESET);
+
+    // Retry Dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check event ack file now gets created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Set quota again to restrict creation of table diff in middle during 
load.
+    replicaFs.setQuota(newReplDir, 
replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 2, 
QUOTA_RESET);
+
+    try {
+      primary.load(primaryDbName, replicatedDbName, withClause);
+    } catch (Exception e) {
+      // Ignore, expected due to quota violation.
+    }
+
+    // Check table diff in progress directory gets created.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_INPROGRESS_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_INPROGRESS_DIRECTORY)));
+
+    // Check table diff complete directory doesn't gets created.
+    assertFalse(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Fix the quota issue and re-attempt.

Review comment:
       Done

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, 
"true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, 
UserGroupInformation.getCurrentUser().getUserName());
+    
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, 
TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country 
string)")
+        .run("insert into table t2 partition(country='india') values 
('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new 
york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country 
string)")
+        .run("insert into table t2_managed partition(country='india') values 
('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values 
('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are 
present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an 
empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't5_managed'")
+        .verifyResult("t5_managed")
+        .run("show tables like 't6_managed'")
+        .verifyResult("t6_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do some modifications on other database with similar table names &  
some modifications on original source
+    // cluster.
+    primary.run("create database " + extraPrimaryDb)
+        .run("use " + extraPrimaryDb)
+        .run("create external table t1 (id int)")
+        .run("create table t1_managed (id int)")
+        .run("use " + primaryDbName)
+        .run("create external table t4 (id int)")
+        .run("insert into table t4 values (100)")
+        .run("insert into table t4 values (201)")
+        .run("create table t4_managed (id int)")
+        .run("insert into table t4_managed values (110)")
+        .run("insert into table t4_managed values (220)")
+        .run("insert into table t2 partition(country='france') values 
('lyon')")
+        .run("insert into table t2_managed partition(country='france') values 
('nice')")
+        .run("alter table t6_managed add columns (name string)")
+        .run("drop table t5_managed");
+
+    // Prepare for reverse replication.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Check the event ack file got created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Check in case the dump isn't consumed, and we attempt a dump again, 
that gets skipped and the dump directory
+    // doesn't change, without any errors.
+
+    ContentSummary beforeContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName, 
withClause);
+    assertTrue(emptyTuple.dumpLocation.isEmpty());
+    assertTrue(emptyTuple.lastReplicationId.isEmpty());
+    ContentSummary afterContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), 
afterContentSummary.getFileAndDirectoryCount());
+
+    // Check the event ack file stays intact, despite having a skipped dump.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a load, this should create a table_diff_complete directory
+    primary.load(primaryDbName,replicatedDbName, withClause);
+
+    // Check the table diff directory exist.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff has all the modified table, including the dropped 
and empty ones
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new 
Path(tuple.dumpLocation), conf);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", 
"t5_managed", "t6_managed")));
+
+    // Do a load again and see, nothing changes as this load isn't consumed.
+    beforeContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    primary.load(primaryDbName, replicatedDbName, withClause);
+    assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+        .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed", 
"t5_managed", "t6_managed")));
+    afterContentSummary = replicaFs.getContentSummary(new 
Path(tuple.dumpLocation).getParent());
+    assertEquals(beforeContentSummary.getFileAndDirectoryCount(), 
afterContentSummary.getFileAndDirectoryCount());
+
+    // Check there are entries in the table files.
+    assertFalse(getPathsFromTableFile("t4", new Path(tuple.dumpLocation), 
conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2", new Path(tuple.dumpLocation), 
conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t4_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+    assertFalse(getPathsFromTableFile("t2_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+
+    // Check the dropped and empty tables.
+    assertTrue(getPathsFromTableFile("t5_managed", new 
Path(tuple.dumpLocation), conf).isEmpty());
+    assertTrue(getPathsFromTableFile("t6_managed", new 
Path(tuple.dumpLocation), conf).size() == 1);
+  }
+
+  @Test
+  public void testEmptyDiffForControlFailover() throws Throwable {
+
+    // In case of control failover both A & B will be in sync, so the table 
diff should be created empty, without any
+    // error.
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+
+    // Do a bootstrap cycle(A->B)
+    primary.dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
+
+    // Add some tables & do a incremental dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (100),(200)")
+        .run("insert into table t1 values (12),(35),(46)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (120)")
+        .run("insert into table t1_managed values (10),(321),(423)")
+        .dump(primaryDbName, withClause);
+
+    // Do an incremental load and see all the tables are there.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed");
+
+    // Trigger reverse cycle. Do dump on target cluster.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "rev");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a reverse dump
+    tuple = replica.dump(replicatedDbName, withClause);
+
+    // Even though no diff, the event ack file should be created.
+    assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + " 
doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+    // Do a reverse load.
+    primary.load(primaryDbName, replicatedDbName, withClause);
+
+    // Check the table diff directory still gets created.
+    assertTrue(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+        replicaFs.exists(new Path(tuple.dumpLocation, 
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+    // Check the table diff is empty, since we are in sync, so no tables got 
modified.
+    HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new 
Path(tuple.dumpLocation), conf);
+    assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0, 
tableDiffEntries.size());
+  }
+
+  @Test
+  public void testFirstIncrementalMandatory() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    // Create one external and one managed tables and do a bootstrap dump.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create  table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .dump(primaryDbName, withClause);
+
+    // Do a bootstrap load and check both managed and external tables are 
loaded.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Trigger reverse dump just after the bootstrap cycle.
+    DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+    Path newReplDir = new Path(replica.repldDir + "1");
+    replicaFs.mkdirs(newReplDir);
+    withClause = ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
newReplDir + "'");
+
+    // Do a dump on cluster B, it should throw an exception, since the first 
incremental isn't done yet.
+    try {
+      replica.dump(replicatedDbName, withClause);
+    } catch (HiveException he) {
+      assertTrue(he.getMessage()
+          .contains("Replication dump not allowed for replicated database with 
first incremental dump pending "));
+    }
+  }

Review comment:
       Done

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import 
org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hadoop.hive.ql.parse.ReplicationSpec.getLastReplicatedStateFromParameters;
+
+public class OptimisedBootstrapUtils {
+
+  public static final String FILE_ENTRY_SEPERATOR = "#";
+  private static Logger LOG = 
LoggerFactory.getLogger(OptimisedBootstrapUtils.class);
+
+  /** table diff directory when in progress */
+  public static final String TABLE_DIFF_INPROGRESS_DIRECTORY = "table_diff";
+
+  /** table diff directory when complete */
+  public static final String TABLE_DIFF_COMPLETE_DIRECTORY = 
"table_diff_complete";
+
+  /** event ack file which contains the event id till which the cluster was 
last loaded. */
+  public static final String EVENT_ACK_FILE = "event_ack";
+
+  /**
+   * Gets & checks whether the database is target of replication.
+   * @param dbName name of database
+   * @param hive hive object
+   * @return true, if the database has repl.target.for property set.
+   * @throws HiveException
+   */
+  public static boolean isFailover(String dbName, Hive hive) throws 
HiveException {
+    Database database = hive.getDatabase(dbName);
+    return database != null ? MetaStoreUtils.isTargetOfReplication(database) : 
false;
+  }
+
+  public static boolean checkFileExists(Path dumpPath, HiveConf conf, String 
fileName) throws IOException {
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    return fs.exists(new Path(dumpPath, fileName));
+  }
+
+  /**
+   * Gets the event id from the event ack file
+   * @param dumpPath the dump path
+   * @param conf the hive configuration
+   * @return the event id from file.
+   * @throws IOException
+   */
+  public static String getEventIdFromFile(Path dumpPath, HiveConf conf) throws 
IOException {
+    String lastEventId;
+    Path eventAckFilePath = new Path(dumpPath, EVENT_ACK_FILE);
+    FileSystem fs = eventAckFilePath.getFileSystem(conf);
+    try (FSDataInputStream stream = fs.open(eventAckFilePath);) {
+      lastEventId = IOUtils.toString(stream, Charset.defaultCharset());
+    }
+    return lastEventId.replaceAll(System.lineSeparator(),"").trim();
+  }
+
+  /**
+   * Gets the name of tables in the table diff file.
+   * @param dumpPath the dump path
+   * @param conf the hive configuration
+   * @return Set with list of tables
+   * @throws Exception
+   */
+  public static HashSet<String> getTablesFromTableDiffFile(Path dumpPath, 
HiveConf conf) throws Exception {
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    Path tableDiffPath = new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY);
+    FileStatus[] list = fs.listStatus(tableDiffPath);
+    HashSet<String> tables = new HashSet<>();
+    for (FileStatus fStatus : list) {
+      tables.add(fStatus.getPath().getName());
+    }
+    return tables;
+  }
+
+  /**
+   * Extracts the recursive listing from the table file.
+   * @param file the name of table
+   * @param dumpPath the dump path
+   * @param conf the hive conf
+   * @return the list of paths in the table.
+   * @throws IOException
+   */
+  public static HashSet<String> getPathsFromTableFile(String file, Path 
dumpPath, HiveConf conf) throws IOException {
+    HashSet<String> paths = new HashSet<>();
+    FileSystem fs = dumpPath.getFileSystem(conf);
+    Path tableDiffPath = new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY);
+    Path filePath = new Path(tableDiffPath, file);
+    String allEntries;
+    try (FSDataInputStream stream = fs.open(filePath);) {
+      allEntries = IOUtils.toString(stream, Charset.defaultCharset());
+    }
+    
paths.addAll(Arrays.asList(allEntries.split(System.lineSeparator())).stream().filter(item
 -> !item.isEmpty())
+        .collect(Collectors.toSet()));
+    return paths;
+  }
+
+  /**
+   * Gets the event id stored in database denoting the last loaded event id.
+   * @param dbName the name of database
+   * @param hiveDb the hive object
+   * @return event id from the database
+   * @throws HiveException
+   */
+  public static String getReplEventIdFromDatabase(String dbName, Hive hiveDb) 
throws HiveException {
+    Database database = hiveDb.getDatabase(dbName);
+    String currentLastEventId = 
getLastReplicatedStateFromParameters(database.getParameters());
+    return currentLastEventId;
+  }
+
+  /**
+   * Validates if the first incremental is done before starting optimised 
bootstrap
+   * @param dbName name of database
+   * @param hiveDb the hive object
+   * @throws HiveException
+   */
+  public static void isFirstIncrementalPending(String dbName, Hive hiveDb) 
throws HiveException {
+    Database database = hiveDb.getDatabase(dbName);
+    if (database == null || 
ReplUtils.isFirstIncPending(database.getParameters()))
+      throw new HiveException(
+          "Replication dump not allowed for replicated database with first 
incremental dump pending : " + dbName);
+  }
+
+  /**
+   * Creates the event ack file and sets the dump metadata post that marking 
completion of dump flow for first round
+   * of optimised failover dump.
+   * @param currentDumpPath the dump path
+   * @param dmd the dump metadata
+   * @param cmRoot the cmRoot
+   * @param dbEventId the database event id to which we have to write in the 
file.
+   * @param conf the hive configuraiton
+   * @param work the repldump work
+   * @return the lastReplId denoting a fake dump(-1) always
+   * @throws SemanticException
+   */
+  public static Long getAndCreateEventAckFile(Path currentDumpPath, 
DumpMetaData dmd, Path cmRoot, String dbEventId,

Review comment:
       Yeps, Changed!!!

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static 
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends 
BaseReplicationAcrossInstances {
+
+  String extraPrimaryDb;
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    HashMap<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+        GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+    overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, 
"true");
+    overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, 
UserGroupInformation.getCurrentUser().getUserName());
+    
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname, 
"true");
+
+    internalBeforeClassSetupExclusiveReplica(overrides, overrides, 
TestReplicationOptimisedBootstrap.class);
+  }
+
+  @Before
+  public void setup() throws Throwable {
+    super.setup();
+    extraPrimaryDb = "extra_" + primaryDbName;
+  }
+
+  @After
+  public void tearDown() throws Throwable {
+    primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+    super.tearDown();
+  }
+
+  @Test
+  public void testBuildTableDiffGeneration() throws Throwable {
+    List<String> withClause = 
ReplicationTestUtils.includeExternalTableClause(true);
+    withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + 
primary.repldDir + "'");
+    // Create two external & two managed tables and do a bootstrap dump & load.
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+        .run("create external table t1 (id int)")
+        .run("insert into table t1 values (1)")
+        .run("insert into table t1 values (2),(3),(4)")
+        .run("create external table t2 (place string) partitioned by (country 
string)")
+        .run("insert into table t2 partition(country='india') values 
('chennai')")
+        .run("insert into table t2 partition(country='us') values ('new 
york')")
+        .run("create table t1_managed (id int)")
+        .run("insert into table t1_managed values (10)")
+        .run("insert into table t1_managed values (20),(31),(42)")
+        .run("create table t2_managed (place string) partitioned by (country 
string)")
+        .run("insert into table t2_managed partition(country='india') values 
('bangalore')")
+        .run("insert into table t2_managed partition(country='us') values 
('austin')")
+        .dump(primaryDbName, withClause);
+
+    // Do the bootstrap load and check all the external & managed tables are 
present.
+    replica.load(replicatedDbName, primaryDbName, withClause)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("show tables like 't1'")
+        .verifyResult("t1")
+        .run("show tables like 't2'")
+        .verifyResult("t2")
+        .run("show tables like 't1_managed'")
+        .verifyResult("t1_managed")
+        .run("show tables like 't2_managed'")
+        .verifyResult("t2_managed")
+        .verifyReplTargetProperty(replicatedDbName);
+
+    // Do an incremental dump & load, Add one table which we can drop & an 
empty table as well.
+    tuple = primary.run("use " + primaryDbName)
+        .run("create table t5_managed (id int)")
+        .run("insert into table t5_managed values (110)")
+        .run("insert into table t5_managed values (110)")
+        .run("create table t6_managed (id int)")
+        .dump(primaryDbName, withClause);

Review comment:
       The dump directory will be different, so that policy won't be bothered 
by that. We in general don't allow that. The scenario is same as accidental 
dump & load on wrong sites.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -680,6 +685,32 @@ private int executeIncrementalLoad(long loadStartTime) 
throws Exception {
     }
     Database targetDb = getHive().getDatabase(work.dbNameToLoadIn);
     Map<String, String> props = new HashMap<>();
+
+    // Check if it is a optimise bootstrap failover.

Review comment:
       Done

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
##########
@@ -680,6 +685,32 @@ private int executeIncrementalLoad(long loadStartTime) 
throws Exception {
     }
     Database targetDb = getHive().getDatabase(work.dbNameToLoadIn);
     Map<String, String> props = new HashMap<>();
+
+    // Check if it is a optimise bootstrap failover.
+    if (work.isFailover) {
+      // Check it should be marked as target of replication & not source of 
replication.
+      if (MetaStoreUtils.isTargetOfReplication(targetDb)) {
+        LOG.error("The database {} is already marked as target for 
replication", targetDb.getName());
+        throw new Exception("Failover target is already marked as target");
+      }
+      if (!ReplChangeManager.isSourceOfReplication(targetDb)) {
+        LOG.error("The database {} is already source of replication.", 
targetDb.getName());
+        throw new Exception("Failover target was not source of replication");
+      }
+      boolean isTableDiffPresent =
+          checkFileExists(new Path(work.dumpDirectory).getParent(), conf, 
TABLE_DIFF_COMPLETE_DIRECTORY);
+      Long eventId = Long.parseLong(getEventIdFromFile(new 
Path(work.dumpDirectory).getParent(), conf));
+      if (!isTableDiffPresent) {
+        prepareTableDiffFile(eventId, getHive(), work, conf);
+        if (this.childTasks == null) {
+          this.childTasks = new ArrayList<>();
+        }
+        createReplLoadCompleteAckTask();
+        return 0;
+      } else {
+        // TODO : Load using table_diff

Review comment:
       Done

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -385,7 +419,7 @@ private void finishRemainingTasks() throws 
SemanticException {
     Utils.create(dumpAckFile, conf);
     prepareReturnValues(work.getResultValues());
     work.getMetricCollector().reportEnd(isFailoverInProgress ? 
Status.FAILOVER_READY : Status.SUCCESS);
-    deleteAllPreviousDumpMeta(work.getCurrentDumpPath());
+      deleteAllPreviousDumpMeta(work.getCurrentDumpPath());

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 694851)
    Time Spent: 0.5h  (was: 20m)

> Implement creation of table_diff
> --------------------------------
>
>                 Key: HIVE-25708
>                 URL: https://issues.apache.org/jira/browse/HIVE-25708
>             Project: Hive
>          Issue Type: Sub-task
>            Reporter: Ayush Saxena
>            Assignee: Ayush Saxena
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Generate table_diff file with the list of tables modified on cluster A after 
> the last successful loaded event id on B, which needs to be bootstrapped 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to