This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new e6d9d7041 ATLAS-4861: Export/Import: add flag to skip updating
replicated attributes
e6d9d7041 is described below
commit e6d9d70411b239f1b59104c9ce55663764aa478a
Author: Pinal Shah <[email protected]>
AuthorDate: Fri May 10 17:29:19 2024 +0700
ATLAS-4861: Export/Import: add flag to skip updating replicated attributes
Signed-off-by: Pinal Shah <[email protected]>
---
.../apache/atlas/model/impexp/AtlasExportRequest.java | 18 ++++++++++++++++++
.../apache/atlas/model/impexp/AtlasImportRequest.java | 18 ++++++++++++++++++
.../apache/atlas/repository/impexp/AuditsWriter.java | 13 +++++++++----
3 files changed, 45 insertions(+), 4 deletions(-)
diff --git
a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
index 878b1d8bc..fc4d11d54 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
@@ -53,6 +53,7 @@ public class AtlasExportRequest implements Serializable {
public static final String OPTION_ATTR_MATCH_TYPE = "matchType";
public static final String OPTION_SKIP_LINEAGE = "skipLineage";
public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo";
+ public static final String OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR =
"skipUpdateReplicationAttr";
public static final String FETCH_TYPE_FULL = "full";
public static final String FETCH_TYPE_CONNECTED = "connected";
public static final String FETCH_TYPE_INCREMENTAL = "incremental";
@@ -141,6 +142,23 @@ public class AtlasExportRequest implements Serializable {
return MapUtils.isNotEmpty(options) &&
options.containsKey(OPTION_KEY_REPLICATED_TO);
}
+ @JsonIgnore
+ public boolean skipUpdateReplicationAttr() {
+ if (MapUtils.isNotEmpty(getOptions()) &&
getOptions().containsKey(OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR)) {
+
+ Object o =
getOptions().get(AtlasExportRequest.OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR);
+ if (o instanceof String) {
+ return Boolean.parseBoolean((String) o);
+ }
+
+ if (o instanceof Boolean) {
+ return (Boolean) o;
+ }
+
+ }
+ return false;
+ }
+
@JsonIgnore
public String getOptionKeyReplicatedTo() {
String replicateToServerName = isReplicationOptionSet() ? (String)
options.get(OPTION_KEY_REPLICATED_TO) : StringUtils.EMPTY;
diff --git
a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index cbc1aa938..12945ca2a 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -43,6 +43,7 @@ public class AtlasImportRequest implements Serializable {
public static final String TRANSFORMS_KEY = "transforms";
public static final String TRANSFORMERS_KEY = "transformers";
public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
+ public static final String OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR =
"skipUpdateReplicationAttr";
public static final String OPTION_KEY_MIGRATION_FILE_NAME =
"migrationFileName";
public static final String OPTION_KEY_MIGRATION = "migration";
public static final String OPTION_KEY_NUM_WORKERS = "numWorkers";
@@ -122,6 +123,23 @@ public class AtlasImportRequest implements Serializable {
return MapUtils.isNotEmpty(options) &&
options.containsKey(OPTION_KEY_REPLICATED_FROM);
}
+ @JsonIgnore
+ public boolean skipUpdateReplicationAttr() {
+ if (MapUtils.isNotEmpty(getOptions()) &&
getOptions().containsKey(OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR)) {
+
+ Object o =
getOptions().get(AtlasExportRequest.OPTION_KEY_SKIP_UPDATE_REPLICATION_ATTR);
+ if (o instanceof String) {
+ return Boolean.parseBoolean((String) o);
+ }
+
+ if (o instanceof Boolean) {
+ return (Boolean) o;
+ }
+
+ }
+ return false;
+ }
+
@JsonIgnore
public String getOptionKeyReplicatedFrom() {
return isReplicationOptionSet() ?
options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY;
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index c4de0ed27..130092b44 100644
---
a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -97,6 +97,7 @@ public class AuditsWriter {
String attrNameReplicated,
long lastModifiedTimestamp) throws
AtlasBaseException {
if (!isReplicationSet || CollectionUtils.isEmpty(exportedGuids)) {
+ LOG.warn("Skipping Updating Replication Attributes");
return;
}
@@ -200,13 +201,15 @@ public class AuditsWriter {
private AtlasExportRequest request;
private String targetServerName;
private boolean replicationOptionState;
+ private boolean skipUpdateReplicationAttr;
private String targetServerFullName;
public void add(String userName, AtlasExportResult result,
long startTime, long endTime,
List<String> entityGuids) throws AtlasBaseException {
request = result.getRequest();
- replicationOptionState = request.isReplicationOptionSet();
+ replicationOptionState = request.isReplicationOptionSet();
+ skipUpdateReplicationAttr = request.skipUpdateReplicationAttr();
saveCurrentServer();
@@ -220,7 +223,7 @@ public class AuditsWriter {
return;
}
- updateReplicationAttribute(replicationOptionState,
targetServerName, targetServerFullName,
+ updateReplicationAttribute((replicationOptionState &&
!skipUpdateReplicationAttr), targetServerName, targetServerFullName,
entityGuids, Constants.ATTR_NAME_REPLICATED_TO,
result.getChangeMarker());
}
}
@@ -228,6 +231,7 @@ public class AuditsWriter {
private class ImportAudits {
private AtlasImportRequest request;
private boolean replicationOptionState;
+ private boolean skipUpdateReplicationAttr;
private String sourceServerName;
private String sourceServerFullName;
@@ -235,7 +239,8 @@ public class AuditsWriter {
long startTime, long endTime,
List<String> entityGuids) throws AtlasBaseException {
request = result.getRequest();
- replicationOptionState = request.isReplicationOptionSet();
+ replicationOptionState = request.isReplicationOptionSet();
+ skipUpdateReplicationAttr = request.skipUpdateReplicationAttr();
saveCurrentServer();
@@ -250,7 +255,7 @@ public class AuditsWriter {
return;
}
- updateReplicationAttribute(replicationOptionState,
sourceServerName, sourceServerFullName, entityGuids,
+ updateReplicationAttribute((replicationOptionState &&
!skipUpdateReplicationAttr), sourceServerName, sourceServerFullName,
entityGuids,
Constants.ATTR_NAME_REPLICATED_FROM,
result.getExportResult().getChangeMarker());
}