[ 
https://issues.apache.org/jira/browse/HIVE-23353?focusedWorklogId=434887&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-434887
 ]

ASF GitHub Bot logged work on HIVE-23353:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/May/20 10:55
            Start Date: 19/May/20 10:55
    Worklog Time Spent: 10m 
      Work Description: aasha commented on a change in pull request #1021:
URL: https://github.com/apache/hive/pull/1021#discussion_r427194647



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable 
{
+
+  protected static transient Logger LOG = 
LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new 
AtlasRestClientBuilder();
+  protected transient AtlasRequestBuilder atlasRequestBuilder = new 
AtlasRequestBuilder();
+  private transient AtlasRestClient atlasRestClient = null;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging 
location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), 
atlasReplInfo.getStagingDir());
+      atlasRestClient = 
clientBuilder.getClient(atlasReplInfo.getAtlasEndpoint(), 
atlasReplInfo.getConf());
+      String entityGuid = checkHiveEntityGuid(atlasReplInfo.getSrcCluster(), 
atlasReplInfo.getSrcDB());
+      long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, 
entityGuid);
+      dumpAtlasMetaData(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while dumping atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  public long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String 
entityGuid) throws SemanticException {

Review comment:
       Do you need this to be public?

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1561,4 +1568,123 @@ public void 
testFailureUnsupportedAuthorizerReplication() throws Throwable {
       assertEquals("Authorizer sentry not supported for replication ", 
e.getMessage());
     }
   }
+
+  //Testing just the configs and no impact on existing replication
+  @Test
+  public void testAtlasReplication() throws Throwable {
+    Map<String, String> confMap = defaultAtlasConfMap();
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by 
(load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc 
tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
+    verifyAtlasMetadataPresent();
+
+    confMap.remove("hive.repl.atlas.replicatedto");
+    replica.load(replicatedDbName, primaryDbName, getAtlasClause(confMap))
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"1", "2"});
+  }
+
+  @Test
+  public void testAtlasMissingConfigs() throws Throwable {
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by 
(load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc 
tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)");
+    Map<String, String> confMap = new HashMap<>();
+    confMap.put("hive.in.test", "true");

Review comment:
       use constants

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1561,4 +1568,123 @@ public void 
testFailureUnsupportedAuthorizerReplication() throws Throwable {
       assertEquals("Authorizer sentry not supported for replication ", 
e.getMessage());
     }
   }
+
+  //Testing just the configs and no impact on existing replication
+  @Test
+  public void testAtlasReplication() throws Throwable {
+    Map<String, String> confMap = defaultAtlasConfMap();
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by 
(load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc 
tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
+    verifyAtlasMetadataPresent();
+
+    confMap.remove("hive.repl.atlas.replicatedto");
+    replica.load(replicatedDbName, primaryDbName, getAtlasClause(confMap))
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"acid_table", "table1"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"1", "2"});
+  }
+
+  @Test
+  public void testAtlasMissingConfigs() throws Throwable {
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by 
(load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc 
tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)");
+    Map<String, String> confMap = new HashMap<>();
+    confMap.put("hive.in.test", "true");
+    confMap.put("hive.repl.include.atlas.metadata", "true");
+    ensureFailedReplOperation(getAtlasClause(confMap), 
"hive.repl.atlas.endpoint", true);
+    confMap.put("hive.repl.atlas.endpoint", "http://localhost:21000/atlas";);
+    ensureFailedReplOperation(getAtlasClause(confMap), 
"hive.repl.atlas.replicatedto", true);
+    confMap.put("hive.repl.atlas.replicatedto", replicatedDbName);
+    ensureFailedReplOperation(getAtlasClause(confMap), 
"hive.repl.source.cluster.name", true);
+    confMap.put("hive.repl.source.cluster.name", "cluster0");
+    ensureFailedReplOperation(getAtlasClause(confMap), 
"hive.repl.target.cluster.name", true);
+    confMap.put("hive.repl.target.cluster.name", "cluster1");
+    primary.dump(primaryDbName, getAtlasClause(confMap));
+    verifyAtlasMetadataPresent();
+    confMap.clear();
+    confMap.put("hive.in.test", "true");
+    confMap.put("hive.repl.include.atlas.metadata", "true");
+    ensureFailedReplOperation(getAtlasClause(confMap), 
"hive.repl.atlas.endpoint", false);
+    confMap.put("hive.repl.atlas.endpoint", "http://localhost:21000/atlas";);
+    ensureFailedReplOperation(getAtlasClause(confMap), 
"hive.repl.source.cluster.name", false);
+    confMap.put("hive.repl.source.cluster.name", "cluster0");
+    ensureFailedReplOperation(getAtlasClause(confMap), 
"hive.repl.target.cluster.name", false);
+    confMap.put("hive.repl.target.cluster.name", "cluster1");
+    primary.load(replicatedDbName, primaryDbName, getAtlasClause(confMap));
+  }
+
+  private void verifyAtlasMetadataPresent() throws IOException {
+    Path dbReplDir = new Path(primary.repldDir,
+            
Base64.getEncoder().encodeToString(primaryDbName.toLowerCase().getBytes(StandardCharsets.UTF_8.name())));
+    FileSystem fs = FileSystem.get(dbReplDir.toUri(), primary.getConf());
+    assertTrue(fs.exists(dbReplDir));
+    FileStatus dumpRoots[]  = fs.listStatus(dbReplDir);
+    assert(dumpRoots.length == 1);
+    Path dumpRoot = dumpRoots[0].getPath();
+    assertTrue("Hive dump root doesn't exist", fs.exists(new Path(dumpRoot, 
ReplUtils.REPL_HIVE_BASE_DIR)));
+    Path atlasDumpRoot = new Path(dumpRoot, ReplUtils.REPL_ATLAS_BASE_DIR);
+    assertTrue("Atlas dump root doesn't exist", fs.exists(atlasDumpRoot));
+    assertTrue("Atlas export file doesn't exist",
+            fs.exists(new Path(atlasDumpRoot, 
ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME)));
+    assertTrue("Atlas dump metadata doesn't exist",
+            fs.exists(new Path(atlasDumpRoot, EximUtil.METADATA_NAME)));
+    BufferedReader br = null;
+    try {
+      br = new BufferedReader(new InputStreamReader(
+              fs.open(new Path(atlasDumpRoot, EximUtil.METADATA_NAME)), 
Charset.defaultCharset()));
+      String[] lineContents = br.readLine().split("\t", 5);
+      assertEquals(primary.hiveConf.get("fs.defaultFS"), lineContents[0]);
+      assertEquals(0, Long.parseLong(lineContents[1]));
+    } finally {
+      if (br != null)
+        br.close();
+    }
+  }
+
+  private void ensureFailedReplOperation(List<String> clause, String conf, 
boolean dump) throws Throwable {
+    try {
+      if (dump) {
+        primary.dump(primaryDbName, clause);
+      } else {
+        primary.load(replicatedDbName, primaryDbName, clause);
+      }
+      Assert.fail(conf + " is mandatory config for Atlas metadata replication 
but it didn't fail.");
+    } catch (SemanticException e) {
+      assertEquals(e.getMessage(), (conf + " is mandatory config for Atlas 
metadata replication"));
+    }
+  }
+
+  private Map<String, String> defaultAtlasConfMap() {
+    Map<String, String> confMap = new HashMap<>();
+    confMap.put("hive.in.test", "true");

Review comment:
       use conf constants

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable 
{
+
+  protected static transient Logger LOG = 
LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new 
AtlasRestClientBuilder();
+  protected transient AtlasRequestBuilder atlasRequestBuilder = new 
AtlasRequestBuilder();
+  private transient AtlasRestClient atlasRestClient = null;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging 
location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), 
atlasReplInfo.getStagingDir());
+      atlasRestClient = 
clientBuilder.getClient(atlasReplInfo.getAtlasEndpoint(), 
atlasReplInfo.getConf());
+      String entityGuid = checkHiveEntityGuid(atlasReplInfo.getSrcCluster(), 
atlasReplInfo.getSrcDB());
+      long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, 
entityGuid);
+      dumpAtlasMetaData(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while dumping atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  public long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String 
entityGuid) throws SemanticException {
+    AtlasServer atlasServer = 
atlasRestClient.getServer(atlasReplInfo.getSrcCluster());
+    long ret = (atlasServer == null || 
atlasServer.getAdditionalInfoRepl(entityGuid) == null)
+            ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid);
+    LOG.debug("HiveAtlasPlugin: fromTimestamp: {}", ret);
+    return ret;
+  }
+
+  public void dumpAtlasMetaData(AtlasReplInfo atlasReplInfo) throws 
SemanticException {

Review comment:
       Most of the method scope can be private. Its not called from anywhere 
else

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable 
{
+
+  protected static transient Logger LOG = 
LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new 
AtlasRestClientBuilder();
+  protected transient AtlasRequestBuilder atlasRequestBuilder = new 
AtlasRequestBuilder();
+  private transient AtlasRestClient atlasRestClient = null;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging 
location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), 
atlasReplInfo.getStagingDir());
+      atlasRestClient = 
clientBuilder.getClient(atlasReplInfo.getAtlasEndpoint(), 
atlasReplInfo.getConf());
+      String entityGuid = checkHiveEntityGuid(atlasReplInfo.getSrcCluster(), 
atlasReplInfo.getSrcDB());
+      long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, 
entityGuid);
+      dumpAtlasMetaData(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while dumping atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  public long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String 
entityGuid) throws SemanticException {
+    AtlasServer atlasServer = 
atlasRestClient.getServer(atlasReplInfo.getSrcCluster());
+    long ret = (atlasServer == null || 
atlasServer.getAdditionalInfoRepl(entityGuid) == null)
+            ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid);
+    LOG.debug("HiveAtlasPlugin: fromTimestamp: {}", ret);
+    return ret;
+  }
+
+  public void dumpAtlasMetaData(AtlasReplInfo atlasReplInfo) throws 
SemanticException {
+    LOG.info("HiveAtlasPlugin: Starting export from:{}", 
atlasReplInfo.getStagingDir());
+    try {
+      AtlasExportRequest exportRequest = 
atlasRequestBuilder.createExportRequest(atlasReplInfo,
+              
atlasRequestBuilder.getAtlasClusterName(atlasReplInfo.getSrcCluster()));
+      InputStream inputStream = atlasRestClient.exportData(exportRequest);

Review comment:
       This is created here and closed else where.
   Should be closed here

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Atlas Metadata Replication Load Task.
+ **/
+public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable 
{
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new 
AtlasRestClientBuilder();

Review comment:
       Its only used for import. Is this reused? Do you need class level 
variables?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = { 
Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AtlasDumpWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final Path stagingDir;
+  private final HiveConf conf;
+  private final boolean bootstrap;
+  private final Path prevAtlasDumpDir;
+
+
+  public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path 
prevAtlasDumpDir, HiveConf conf) {
+    this.srcDB = srcDB;
+    this.stagingDir = stagingDir;
+    this.bootstrap = bootstrap;
+    this.prevAtlasDumpDir = prevAtlasDumpDir;
+    this.conf = conf;
+  }
+
+  public AtlasReplInfo getAtlasReplInfo() throws SemanticException{
+    String endpoint = 
getNonEmpty(HiveConf.ConfVars.REPL_ATLAS_ENDPOINT.varname);

Review comment:
       should be validated if a valid url

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadTask.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Atlas Metadata Replication Load Task.
+ **/
+public class AtlasLoadTask extends Task<AtlasLoadWork> implements Serializable 
{
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new 
AtlasRestClientBuilder();
+  private transient AtlasRequestBuilder atlasRequestBuilder = new 
AtlasRequestBuilder();
+  protected static transient Logger LOG = 
LoggerFactory.getLogger(AtlasLoadTask.class);
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo  = work.getAtlasReplInfo();
+      LOG.info("Loading atlas metadata from srcDb: {} to tgtDb: {} from 
staging: {}",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), 
atlasReplInfo.getStagingDir());
+      int importCount = importAtlasMetadata(atlasReplInfo);
+      LOG.info("HiveAtlasPlugin: Atlas entities import count {}", importCount);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while loading atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  private int importAtlasMetadata(AtlasReplInfo atlasReplInfo) throws 
Exception {
+    AtlasImportRequest importRequest = 
atlasRequestBuilder.createImportRequest(atlasReplInfo.getSrcDB(),
+            atlasReplInfo.getTgtDB(), 
atlasRequestBuilder.getAtlasClusterName(atlasReplInfo.getSrcCluster()),
+            
atlasRequestBuilder.getAtlasClusterName(atlasReplInfo.getTgtCluster()), 
atlasReplInfo.getSrcFsUri(),
+            atlasReplInfo.getTgtFsUri(), atlasReplInfo.getSrcCluster());
+    AtlasImportResult result = 
clientBuilder.getClient(atlasReplInfo.getAtlasEndpoint(), 
atlasReplInfo.getConf())
+            .importData(importRequest, atlasReplInfo);
+    if (result == null || result.getProcessedEntities() == null) {
+      LOG.info("HiveAtlasPlugin: No entities found");

Review comment:
       HiveAtlasPlugin tag is needed?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable 
{
+
+  protected static transient Logger LOG = 
LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new 
AtlasRestClientBuilder();
+  protected transient AtlasRequestBuilder atlasRequestBuilder = new 
AtlasRequestBuilder();
+  private transient AtlasRestClient atlasRestClient = null;

Review comment:
       Consider moving to constructor based initialization. This class has 
other public methods which can get called without proper initialization

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable 
{
+
+  protected static transient Logger LOG = 
LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new 
AtlasRestClientBuilder();
+  protected transient AtlasRequestBuilder atlasRequestBuilder = new 
AtlasRequestBuilder();
+  private transient AtlasRestClient atlasRestClient = null;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging 
location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), 
atlasReplInfo.getStagingDir());
+      atlasRestClient = 
clientBuilder.getClient(atlasReplInfo.getAtlasEndpoint(), 
atlasReplInfo.getConf());
+      String entityGuid = checkHiveEntityGuid(atlasReplInfo.getSrcCluster(), 
atlasReplInfo.getSrcDB());
+      long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, 
entityGuid);
+      dumpAtlasMetaData(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while dumping atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  public long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String 
entityGuid) throws SemanticException {

Review comment:
       Can get atlasReplInfo from work

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRequestBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClient;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasRestClientBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Atlas Metadata Replication Dump Task.
+ **/
+public class AtlasDumpTask extends Task<AtlasDumpWork> implements Serializable 
{
+
+  protected static transient Logger LOG = 
LoggerFactory.getLogger(AtlasDumpTask.class);
+  private static final long serialVersionUID = 1L;
+  private transient AtlasRestClientBuilder clientBuilder = new 
AtlasRestClientBuilder();
+  protected transient AtlasRequestBuilder atlasRequestBuilder = new 
AtlasRequestBuilder();
+  private transient AtlasRestClient atlasRestClient = null;
+
+  @Override
+  public int execute() {
+    try {
+      AtlasReplInfo atlasReplInfo = work.getAtlasReplInfo();
+      LOG.info("Dumping Atlas metadata of srcDb: {}, for TgtDb: {} to staging 
location:",
+              atlasReplInfo.getSrcDB(), atlasReplInfo.getTgtDB(), 
atlasReplInfo.getStagingDir());
+      atlasRestClient = 
clientBuilder.getClient(atlasReplInfo.getAtlasEndpoint(), 
atlasReplInfo.getConf());
+      String entityGuid = checkHiveEntityGuid(atlasReplInfo.getSrcCluster(), 
atlasReplInfo.getSrcDB());
+      long currentModifiedTime = getCurrentTimestamp(atlasReplInfo, 
entityGuid);
+      dumpAtlasMetaData(atlasReplInfo);
+      createDumpMetadata(atlasReplInfo, currentModifiedTime);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Exception while dumping atlas metadata", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+  }
+
+  public long getCurrentTimestamp(AtlasReplInfo atlasReplInfo, String 
entityGuid) throws SemanticException {
+    AtlasServer atlasServer = 
atlasRestClient.getServer(atlasReplInfo.getSrcCluster());
+    long ret = (atlasServer == null || 
atlasServer.getAdditionalInfoRepl(entityGuid) == null)
+            ? 0L : (long) atlasServer.getAdditionalInfoRepl(entityGuid);
+    LOG.debug("HiveAtlasPlugin: fromTimestamp: {}", ret);
+    return ret;
+  }
+
+  public void dumpAtlasMetaData(AtlasReplInfo atlasReplInfo) throws 
SemanticException {

Review comment:
       get atlasReplInfo from work

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
##########
@@ -1561,4 +1562,94 @@ public void 
testFailureUnsupportedAuthorizerReplication() throws Throwable {
       assertEquals("Authorizer sentry not supported for replication ", 
e.getMessage());
     }
   }
+
+  //Testing just the configs and no impact on existing replication
+  @Test
+  public void testAtlasReplication() throws Throwable {
+    Map<String, String> confMap = defaultAtlasConfMap();
+    primary.run("use " + primaryDbName)
+            .run("create table  acid_table (key int, value int) partitioned by 
(load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc 
tblproperties ('transactional'='true')")
+            .run("create table table1 (i String)")
+            .run("insert into table1 values (1)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, getAtlasClause(defaultAtlasConfMap()));
+
+    confMap.remove("hive.repl.atlas.replicatedto");

Review comment:
       then this conf needn't be removed?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AttributeTransform;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.type.AtlasType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static 
org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.CLUSTER_NAME_SEPARATOR;
+
+/**
+ * Helper class to create export/import request.
+ */
+public class AtlasRequestBuilder {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AtlasRequestBuilder.class);
+  public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+  static final String ATLAS_TYPE_HIVE_DB = "hive_db";
+  static final String ATLAS_TYPE_HIVE_SD = "hive_storagedesc";
+  static final String QUALIFIED_NAME_FORMAT = "%s@%s";
+
+  private static final String ATTRIBUTE_NAME_CLUSTER_NAME = ".clusterName";
+  private static final String ATTRIBUTE_NAME_NAME = ".name";
+  private static final String ATTRIBUTE_NAME_REPLICATED_TO = "replicatedTo";
+  private static final String ATTRIBUTE_NAME_REPLICATED_FROM = 
"replicatedFrom";
+  private static final String ATTRIBUTE_NAME_LOCATION = ".location";
+
+  private static final String HIVE_DB_CLUSTER_NAME = ATLAS_TYPE_HIVE_DB + 
ATTRIBUTE_NAME_CLUSTER_NAME;
+  private static final String HIVE_DB_NAME = ATLAS_TYPE_HIVE_DB + 
ATTRIBUTE_NAME_NAME;
+  private static final String HIVE_DB_LOCATION = ATLAS_TYPE_HIVE_DB + 
ATTRIBUTE_NAME_LOCATION;
+  private static final String HIVE_SD_LOCATION = ATLAS_TYPE_HIVE_SD + 
ATTRIBUTE_NAME_LOCATION;
+
+  private static final String TRANSFORM_ENTITY_SCOPE = "__entity";
+  private static final String REPLICATED_TAG_NAME = "%s_replicated";
+
+  public AtlasExportRequest createExportRequest(AtlasReplInfo atlasReplInfo, 
String srcAtlasServer) {
+    List<AtlasObjectId> itemsToExport = getItemsToExport(atlasReplInfo, 
srcAtlasServer);
+    Map<String, Object> options = getOptions(atlasReplInfo);
+    return createRequest(itemsToExport, options);
+  }
+
+  public List<AtlasObjectId> getItemsToExport(AtlasReplInfo atlasReplInfo, 
String srcAtlasServerName) {

Review comment:
       Do you need all methods to be public

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Builder for AtlasRestClient.
+ */
+public class AtlasRestClientBuilder {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AtlasRestClientBuilder.class);
+  private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = 
"atlas.client.ha.retries";
+  private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = 
"atlas.client.ha.sleep.interval.ms";
+  private static final String ATLAS_PROPERTY_REST_ADDRESS = 
"atlas.rest.address";
+  private static final String ATLAS_PROPERTY_AUTH_KERBEROS = 
"atlas.authentication.method.kerberos";
+  private static final String URL_SEPERATOR = ",";
+
+  private AuthStrategy authStrategy;
+  private UserGroupInformation userGroupInformation;
+  protected String incomingUrl;
+  protected String[] baseUrls;
+
+  enum AuthStrategy {
+    KERBEROS

Review comment:
       If Kerberos is the only strategy supported do you need this?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpWork.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication work.
+ */
+@Explain(displayName = "Atlas Meta Data Dump Work", explainLevels = { 
Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AtlasDumpWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final Path stagingDir;
+  private final HiveConf conf;
+  private final boolean bootstrap;
+  private final Path prevAtlasDumpDir;
+
+
+  public AtlasDumpWork(String srcDB, Path stagingDir, boolean bootstrap, Path 
prevAtlasDumpDir, HiveConf conf) {
+    this.srcDB = srcDB;
+    this.stagingDir = stagingDir;
+    this.bootstrap = bootstrap;
+    this.prevAtlasDumpDir = prevAtlasDumpDir;
+    this.conf = conf;

Review comment:
       Is the conf needed in the work? Its already there in the task

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasLoadWork.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.atlas.AtlasReplInfo;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+
+/**
+ * Atlas metadata replication load work.
+ */
+@Explain(displayName = "Atlas Meta Data Load Work", explainLevels = 
{Level.USER, Level.DEFAULT, Level.EXTENDED})
+public class AtlasLoadWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final String srcDB;
+  private final String tgtDB;
+  private final Path stagingDir;
+  private final HiveConf conf;

Review comment:
       This conf is already part of task. Is this needed in the work as well? 
Conf wont be serializable

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRequestBuilder.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AttributeTransform;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.type.AtlasType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static 
org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.CLUSTER_NAME_SEPARATOR;
+
+/**
+ * Helper class to create export/import request.
+ */
+public class AtlasRequestBuilder {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AtlasRequestBuilder.class);
+  public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+  static final String ATLAS_TYPE_HIVE_DB = "hive_db";
+  static final String ATLAS_TYPE_HIVE_SD = "hive_storagedesc";
+  static final String QUALIFIED_NAME_FORMAT = "%s@%s";
+
+  private static final String ATTRIBUTE_NAME_CLUSTER_NAME = ".clusterName";
+  private static final String ATTRIBUTE_NAME_NAME = ".name";
+  private static final String ATTRIBUTE_NAME_REPLICATED_TO = "replicatedTo";
+  private static final String ATTRIBUTE_NAME_REPLICATED_FROM = 
"replicatedFrom";
+  private static final String ATTRIBUTE_NAME_LOCATION = ".location";
+
+  private static final String HIVE_DB_CLUSTER_NAME = ATLAS_TYPE_HIVE_DB + 
ATTRIBUTE_NAME_CLUSTER_NAME;
+  private static final String HIVE_DB_NAME = ATLAS_TYPE_HIVE_DB + 
ATTRIBUTE_NAME_NAME;
+  private static final String HIVE_DB_LOCATION = ATLAS_TYPE_HIVE_DB + 
ATTRIBUTE_NAME_LOCATION;
+  private static final String HIVE_SD_LOCATION = ATLAS_TYPE_HIVE_SD + 
ATTRIBUTE_NAME_LOCATION;
+
+  private static final String TRANSFORM_ENTITY_SCOPE = "__entity";
+  private static final String REPLICATED_TAG_NAME = "%s_replicated";
+
+  public AtlasExportRequest createExportRequest(AtlasReplInfo atlasReplInfo, 
String srcAtlasServer) {
+    List<AtlasObjectId> itemsToExport = getItemsToExport(atlasReplInfo, 
srcAtlasServer);
+    Map<String, Object> options = getOptions(atlasReplInfo);
+    return createRequest(itemsToExport, options);
+  }
+
+  public List<AtlasObjectId> getItemsToExport(AtlasReplInfo atlasReplInfo, 
String srcAtlasServerName) {
+    List<AtlasObjectId> atlasObjectIds = new ArrayList<>();
+    final String qualifiedName = getQualifiedName(srcAtlasServerName, 
atlasReplInfo.getSrcDB());
+    atlasObjectIds.add(new AtlasObjectId(ATLAS_TYPE_HIVE_DB, 
ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
+    return atlasObjectIds;
+  }
+
+  private AtlasExportRequest createRequest(final List<AtlasObjectId> 
itemsToExport,
+                                           final Map<String, Object> options) {
+    AtlasExportRequest request = new AtlasExportRequest() {
+      {
+        setItemsToExport(itemsToExport);
+        setOptions(options);
+      }
+    };
+    LOG.debug("createRequest: {}" + request);
+    return request;
+  }
+
+  private Map<String, Object> getOptions(AtlasReplInfo atlasReplInfo) {
+    String targetCluster = atlasReplInfo.getTgtCluster();
+    Map<String, Object> options = new HashMap<>();
+    options.put(AtlasExportRequest.OPTION_FETCH_TYPE, 
AtlasExportRequest.FETCH_TYPE_INCREMENTAL);
+    options.put(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER, 
atlasReplInfo.getTimeStamp());
+    options.put(AtlasExportRequest.OPTION_SKIP_LINEAGE, true);
+    if (targetCluster != null && !targetCluster.isEmpty()) {
+      options.put(AtlasExportRequest.OPTION_KEY_REPLICATED_TO, targetCluster);
+    }
+    return options;
+  }
+
+  public AtlasObjectId getItemToExport(String srcCluster, String srcDB) {
+    final String qualifiedName = getQualifiedName(srcCluster, srcDB);
+    return new AtlasObjectId(ATLAS_TYPE_HIVE_DB, ATTRIBUTE_QUALIFIED_NAME, 
qualifiedName);
+  }
+
+  public String getQualifiedName(String clusterName, String srcDb) {
+    String qualifiedName = String.format(QUALIFIED_NAME_FORMAT, 
srcDb.toLowerCase(), clusterName);
+    LOG.debug("AtlasProcess: getQualifiedName: {}", qualifiedName);
+    return qualifiedName;
+  }
+
+  public AtlasImportRequest createImportRequest(String sourceDataSet, String 
targetDataSet,
+                                                String sourceClusterName, 
String targetClusterName,
+                                                String sourcefsEndpoint, 
String targetFsEndpoint,
+                                                String 
sourceClusterFullyQualifiedName) {
+    AtlasImportRequest request = new AtlasImportRequest();
+    addTransforms(request.getOptions(),
+            sourceClusterName, targetClusterName,
+            sourceDataSet, targetDataSet,
+            sourcefsEndpoint, targetFsEndpoint);
+    addReplicatedFrom(request.getOptions(), sourceClusterFullyQualifiedName);
+    LOG.debug("AtlasProcess: importRequest: {}" + request);
+    return request;
+  }
+
+  private void addTransforms(Map<String, String> options, String 
srcClusterName,
+                             String tgtClusterName, String sourceDataSet, 
String targetDataSet,
+                             String sourcefsEndpoint, String targetFsEndpoint) 
{
+    List<AttributeTransform> transforms = new ArrayList<>();
+    String sanitizedSourceClusterName = 
sanitizeForClassificationName(srcClusterName);
+    addClassificationTransform(transforms,
+            String.format(REPLICATED_TAG_NAME, sanitizedSourceClusterName));
+    addClearReplicationAttributesTransform(transforms);
+    addClusterRenameTransform(transforms, srcClusterName, tgtClusterName);
+    if (!sourceDataSet.equals(targetDataSet)) {
+      addDataSetRenameTransform(transforms, sourceDataSet, targetDataSet);
+    }
+    addLocationTransform(transforms, sourcefsEndpoint, targetFsEndpoint);
+    options.put(AtlasImportRequest.TRANSFORMERS_KEY, 
AtlasType.toJson(transforms));
+  }
+
+  private void addLocationTransform(List<AttributeTransform> transforms, 
String srcFsUri, String tgtFsUri) {
+    transforms.add(create(
+            HIVE_DB_LOCATION, "STARTS_WITH_IGNORE_CASE: " + srcFsUri,
+            HIVE_DB_LOCATION, "REPLACE_PREFIX: = :" + srcFsUri + "=" + tgtFsUri
+            )
+    );
+    transforms.add(create(
+            HIVE_SD_LOCATION, "STARTS_WITH_IGNORE_CASE: " + srcFsUri,
+            HIVE_SD_LOCATION, "REPLACE_PREFIX: = :" + srcFsUri + "=" + tgtFsUri
+            )
+    );
+  }
+
+  private void addDataSetRenameTransform(List<AttributeTransform> transforms,
+                                         String sourceDataSet, String 
targetDataSet) {
+    transforms.add(create(
+            HIVE_DB_NAME, "EQUALS: " + sourceDataSet,
+            HIVE_DB_NAME, "SET: " + targetDataSet));
+  }
+
+  private void addClusterRenameTransform(List<AttributeTransform> transforms,
+                                         String srcClusterName, String 
tgtClustername) {
+    transforms.add(create(HIVE_DB_CLUSTER_NAME, "EQUALS: " + srcClusterName,
+            HIVE_DB_CLUSTER_NAME, "SET: " + tgtClustername));
+  }
+
+  private void addReplicatedFrom(Map<String, String> options, String 
sourceClusterName) {
+    options.put(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM, 
sourceClusterName);
+  }
+
+  private void addClassificationTransform(List<AttributeTransform> transforms, 
String classificationName) {
+    transforms.add(create("__entity", "topLevel: ",
+            "__entity", "ADD_CLASSIFICATION: " + classificationName));
+  }
+
+  private String sanitizeForClassificationName(String s) {
+    if (s != null && s.isEmpty()) {
+      return s;
+    }
+    return s.replace('-', '_').replace(' ', '_');
+  }
+
+  private void addClearReplicationAttributesTransform(List<AttributeTransform> 
transforms) {
+    Map<String, String> actions = new HashMap<>();
+    actions.put(TRANSFORM_ENTITY_SCOPE + "." + ATTRIBUTE_NAME_REPLICATED_TO, 
"CLEAR:");
+    actions.put(TRANSFORM_ENTITY_SCOPE + "." + ATTRIBUTE_NAME_REPLICATED_FROM, 
"CLEAR:");
+
+    transforms.add(new AttributeTransform(null, actions));
+  }
+
+  private AttributeTransform create(String conditionLhs, String conditionRhs,
+                                    String actionLhs, String actionRhs) {
+    return new AttributeTransform(Collections.singletonMap(conditionLhs, 
conditionRhs),
+            Collections.singletonMap(actionLhs, actionRhs));
+  }
+
+  public String getAtlasClusterName(String clusterName) {

Review comment:
       Mayn't be needed for apache

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * Builder for AtlasRestClient.
+ */
+public class AtlasRestClientBuilder {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AtlasRestClientBuilder.class);
+  private static final String ATLAS_PROPERTY_CLIENT_HA_RETRIES_KEY = 
"atlas.client.ha.retries";
+  private static final String ATLAS_PROPERTY_CLIENT_HA_SLEEP_INTERVAL_MS_KEY = 
"atlas.client.ha.sleep.interval.ms";
+  private static final String ATLAS_PROPERTY_REST_ADDRESS = 
"atlas.rest.address";
+  private static final String ATLAS_PROPERTY_AUTH_KERBEROS = 
"atlas.authentication.method.kerberos";
+  private static final String URL_SEPERATOR = ",";
+
+  private AuthStrategy authStrategy;
+  private UserGroupInformation userGroupInformation;
+  protected String incomingUrl;
+  protected String[] baseUrls;
+
+  enum AuthStrategy {
+    KERBEROS
+  }
+
+  public AtlasRestClientBuilder() {
+  }
+
+  public AtlasRestClient getClient(String atlasEndpoint, HiveConf conf) throws 
SemanticException {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) {
+      return new NoOpAtlasRestClient();
+    }
+    return baseUrl(atlasEndpoint).create();
+  }
+
+  private AtlasRestClientBuilder baseUrl(String urls) {
+    this.incomingUrl = urls;
+    if (urls.contains(URL_SEPERATOR)) {
+      this.baseUrls = urls.split(URL_SEPERATOR);
+    } else {
+      this.baseUrls = new String[]{urls};
+    }
+    return this;
+  }
+
+  private AtlasRestClientBuilder setAuthStrategy() throws SemanticException {
+    try {
+      authStrategy = AuthStrategy.KERBEROS;

Review comment:
       If Kerberos is the only strategy supported do you need this?

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.impexp.AtlasServer;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.sun.jersey.api.client.ClientResponse.Status.NOT_FOUND;
+
+/**
+ * Implementation of RESTClient, encapsulates Atlas' REST APIs.
+ */
+public class AtlasRestClientImpl extends RetryingClient implements 
AtlasRestClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AtlasRestClientImpl.class);
+  private final AtlasClientV2 clientV2;
+
+  public AtlasRestClientImpl(AtlasClientV2 clientV2) {
+    this.clientV2 = clientV2;
+  }
+
+  private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit 
timeUnit) throws Exception {

Review comment:
       Do we need this? Can we reuse the existing retry and add some timeouts 
there

##########
File path: 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.atlas;
+
+import com.sun.jersey.api.client.UniformInterfaceException;
+import org.apache.atlas.AtlasServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Implement retry logic for service calls.
+ */
+public class RetryingClient {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RetryingClient.class);
+  private static final int PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT = (30 * 
1000);
+  private static final int RETRY_COUNT_DEFAULT = 5;
+  private static final String ERROR_MESSAGE_NO_ENTITIES = "no entities to 
create/update";
+  private static final String ERROR_MESSAGE_IN_PROGRESS = "import or export is 
in progress";
+  private static final String ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP = "empty ZIP 
file";
+  private static final int MAX_RETY_COUNT = RETRY_COUNT_DEFAULT;
+  private static final int PAUSE_DURATION_INCREMENT_IN_MS = 
PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT;
+
+  protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) 
throws Exception {

Review comment:
       Can we reuse existing retry classes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 434887)
    Time Spent: 2.5h  (was: 2h 20m)

> Atlas metadata replication scheduling
> -------------------------------------
>
>                 Key: HIVE-23353
>                 URL: https://issues.apache.org/jira/browse/HIVE-23353
>             Project: Hive
>          Issue Type: Task
>            Reporter: PRAVIN KUMAR SINHA
>            Assignee: PRAVIN KUMAR SINHA
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-23353.01.patch, HIVE-23353.02.patch, 
> HIVE-23353.03.patch
>
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to