This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new fe7d79c84 ATLAS-4903 : When migration restarts it results into
deletion of edges and vertices
fe7d79c84 is described below
commit fe7d79c841b7d294c9990d676672898102d22bb0
Author: chaitali <[email protected]>
AuthorDate: Wed Sep 18 23:18:20 2024 +0530
ATLAS-4903 : When migration restarts it results into deletion of edges and
vertices
Signed-off-by: Pinal Shah <[email protected]>
---
.../atlas/repository/store/graph/v2/EntityGraphMapper.java | 2 +-
.../repository/store/graph/v2/bulkimport/MigrationImport.java | 7 +++++--
.../repository/store/graph/v2/bulkimport/pc/EntityConsumer.java | 8 +++++---
.../store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java | 6 ++++--
server-api/src/main/java/org/apache/atlas/RequestContext.java | 9 +++++++++
5 files changed, 24 insertions(+), 8 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 6b395dd17..3f7f73e70 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -1458,7 +1458,7 @@ public class EntityGraphMapper {
}
if (isReference && !isSoftReference) {
- boolean isAppendOnPartialUpdate =
getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName());
+ boolean isAppendOnPartialUpdate =
RequestContext.get().isMigrationInProgress() ||
getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName());
if (isAppendOnPartialUpdate) {
allArrayElements = unionCurrentAndNewElements(attribute,
(List) currentElements, (List) newElementsCreated);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
index f8c9218c6..d34d9b658 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -106,10 +106,13 @@ public class MigrationImport extends ImportStrategy {
int batchSize = importResult.getRequest().getOptionKeyBatchSize();
int numWorkers =
getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
-
+ boolean isMigrationImport = false;
+ if (importResult.getRequest().getOptions().get("migration")!=null) {
+ isMigrationImport =
Boolean.valueOf(importResult.getRequest().getOptions().get("migration"));
+ }
EntityConsumerBuilder consumerBuilder =
new EntityConsumerBuilder(typeRegistry, this.graph,
entityStore, entityGraphRetriever, graphBulk,
- entityStoreBulk, entityGraphRetrieverBulk, batchSize);
+ entityStoreBulk, entityGraphRetrieverBulk, batchSize,
isMigrationImport);
LOG.info("MigrationImport: EntityCreationManager: Created!");
return new EntityCreationManager(consumerBuilder, batchSize,
numWorkers, importResult, dataMigrationStatusService);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
index b73988fd7..a22300f1b 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
@@ -21,7 +21,6 @@ import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.pc.WorkItemConsumer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -33,8 +32,8 @@ import
org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.store.graph.v2.EntityStream;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +56,7 @@ public class EntityConsumer extends
WorkItemConsumer<AtlasEntity.AtlasEntityWith
private final AtlasEntityStore entityStoreBulk;
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityRetrieverBulk;
+ private final boolean isMigrationImport;
private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new
ArrayList<>();
private List<String> localResults = new ArrayList<>();
@@ -64,7 +64,7 @@ public class EntityConsumer extends
WorkItemConsumer<AtlasEntity.AtlasEntityWith
public EntityConsumer(AtlasTypeRegistry typeRegistry,
AtlasGraph atlasGraph, AtlasEntityStore entityStore,
AtlasGraph atlasGraphBulk, AtlasEntityStore
entityStoreBulk, EntityGraphRetriever entityRetrieverBulk,
- BlockingQueue queue, int batchSize) {
+ BlockingQueue queue, int batchSize , boolean
isMigrationImport) {
super(queue);
this.typeRegistry = typeRegistry;
@@ -76,6 +76,7 @@ public class EntityConsumer extends
WorkItemConsumer<AtlasEntity.AtlasEntityWith
this.entityRetrieverBulk = entityRetrieverBulk;
this.batchSize = batchSize;
+ this.isMigrationImport = isMigrationImport;
}
@Override
@@ -98,6 +99,7 @@ public class EntityConsumer extends
WorkItemConsumer<AtlasEntity.AtlasEntityWith
private void processEntity(AtlasEntity.AtlasEntityWithExtInfo
entityWithExtInfo, long currentCount) {
RequestContext.get().setImportInProgress(true);
RequestContext.get().setCreateShellEntityForNonExistingReference(true);
+ RequestContext.get().setMigrationInProgress(this.isMigrationImport);
try {
LOG.debug("Processing: {}", currentCount);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
index 7eac8df73..b6836ee20 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
@@ -37,10 +37,11 @@ public class EntityConsumerBuilder implements
WorkItemBuilder<EntityConsumer, At
private final AtlasTypeRegistry typeRegistry;
private EntityGraphRetriever entityRetrieverBulk;
private int batchSize;
+ private final boolean isMigrationImport;
public EntityConsumerBuilder(AtlasTypeRegistry typeRegistry, AtlasGraph
atlasGraph, AtlasEntityStoreV2 entityStore, EntityGraphRetriever
entityRetriever,
AtlasGraph atlasGraphBulk, AtlasEntityStoreV2
entityStoreBulk, EntityGraphRetriever entityRetrieverBulk,
- int batchSize) {
+ int batchSize, boolean isMigrationImport) {
this.typeRegistry = typeRegistry;
this.atlasGraph = atlasGraph;
@@ -52,12 +53,13 @@ public class EntityConsumerBuilder implements
WorkItemBuilder<EntityConsumer, At
this.entityRetrieverBulk = entityRetrieverBulk;
this.batchSize = batchSize;
+ this.isMigrationImport = isMigrationImport;
}
@Override
public EntityConsumer
build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) {
return new EntityConsumer(typeRegistry, atlasGraph, entityStore,
atlasGraphBulk, entityStoreBulk, entityRetrieverBulk,
- queue, this.batchSize);
+ queue, this.batchSize, this.isMigrationImport);
}
}
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java
b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index e144d3650..e1670a924 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -74,6 +74,7 @@ public class RequestContext {
private int maxAttempts = 1;
private int attemptCount = 1;
private boolean isImportInProgress = false;
+ private boolean isMigrationInProgress = false;
private boolean isInNotificationProcessing = false;
private boolean isInTypePatching = false;
private boolean createShellEntityForNonExistingReference = false;
@@ -202,6 +203,14 @@ public class RequestContext {
return isImportInProgress;
}
+ public boolean isMigrationInProgress() {
+ return isMigrationInProgress;
+ }
+
+ public void setMigrationInProgress(boolean migrationInProgress) {
+ isMigrationInProgress = migrationInProgress;
+ }
+
public void setImportInProgress(boolean importInProgress) {
isImportInProgress = importInProgress;
}