This is an automated email from the ASF dual-hosted git repository.
chaitalithombare pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 0d00b7303 ATLAS-5055: Incremental Export : When entity exported has a
tag propagated from entity which is deleted , tag is not propagated to it at
target (#434)
0d00b7303 is described below
commit 0d00b7303e87aec67c56955fc34dc72ccf5ca665
Author: chaitalicod <[email protected]>
AuthorDate: Mon Sep 22 21:13:28 2025 +0530
ATLAS-5055: Incremental Export : When entity exported has a tag propagated
from entity which is deleted , tag is not propagated to it at target (#434)
Co-authored-by: chaitalithombare <[email protected]>
---
.../repository/store/graph/v1/DeleteHandlerV1.java | 10 ++++---
.../store/graph/v2/EntityGraphMapper.java | 13 +++++----
.../store/graph/v2/EntityGraphRetriever.java | 4 +--
.../store/graph/v2/tasks/ClassificationTask.java | 15 +++++++++++
.../ClassificationPropagationWithTasksTest.java | 30 +++++++++++++++++++++
.../src/test/resources/deleted_tab_propagation.zip | Bin 0 -> 26200 bytes
6 files changed, 60 insertions(+), 12 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
index 5d83f2934..378fd762a 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java
@@ -957,10 +957,10 @@ public abstract class DeleteHandlerV1 {
}
}
- public void createAndQueueTask(String taskType, AtlasVertex entityVertex,
String classificationVertexId, String relationshipGuid, String
classificationName) {
+ public void createAndQueueTask(String taskType, AtlasVertex entityVertex,
String classificationVertexId, String relationshipGuid, String
classificationName, boolean isImportInProgress) {
String currentUser = RequestContext.getCurrentUser();
String entityGuid = GraphHelper.getGuid(entityVertex);
- Map<String, Object> taskParams =
ClassificationTask.toParameters(entityGuid, classificationVertexId,
relationshipGuid, classificationName);
+ Map<String, Object> taskParams =
ClassificationTask.toParameters(entityGuid, classificationVertexId,
relationshipGuid, classificationName, isImportInProgress);
AtlasTask task = taskManagement.createTask(taskType,
currentUser, taskParams);
AtlasGraphUtilsV2.addEncodedProperty(entityVertex,
PENDING_TASKS_PROPERTY_KEY, task.getGuid());
@@ -1280,10 +1280,11 @@ public abstract class DeleteHandlerV1 {
private void addTagPropagation(AtlasVertex fromVertex, AtlasVertex
toVertex, AtlasEdge edge) throws AtlasBaseException {
final List<AtlasVertex> classificationVertices =
getPropagationEnabledClassificationVertices(fromVertex);
String relationshipGuid =
getRelationshipGuid(edge);
+ boolean isImportInProgress =
RequestContext.get().isImportInProgress();
if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
for (AtlasVertex classificationVertex : classificationVertices) {
- createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, toVertex,
classificationVertex.getIdForDisplay(), relationshipGuid,
GraphHelper.getClassificationName(classificationVertex));
+ createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD, toVertex,
classificationVertex.getIdForDisplay(), relationshipGuid,
GraphHelper.getClassificationName(classificationVertex), isImportInProgress);
}
} else {
final List<AtlasVertex> propagatedEntityVertices =
CollectionUtils.isNotEmpty(classificationVertices) ?
entityRetriever.getIncludedImpactedVerticesV2(toVertex, relationshipGuid) :
null;
@@ -1347,6 +1348,7 @@ public abstract class DeleteHandlerV1 {
*/
private void deleteAllClassifications(AtlasVertex instanceVertex) throws
AtlasBaseException {
List<AtlasEdge> classificationEdges =
getAllClassificationEdges(instanceVertex);
+ boolean isImportInProgress = RequestContext.get().isImportInProgress();
for (AtlasEdge edge : classificationEdges) {
AtlasVertex classificationVertex = edge.getInVertex();
@@ -1355,7 +1357,7 @@ public abstract class DeleteHandlerV1 {
if (isClassificationEdge && removePropagations) {
if (taskManagement != null && DEFERRED_ACTION_ENABLED) {
- createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE,
instanceVertex, classificationVertex.getIdForDisplay(), null,
GraphHelper.getClassificationName(classificationVertex));
+ createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE,
instanceVertex, classificationVertex.getIdForDisplay(), null,
GraphHelper.getClassificationName(classificationVertex), isImportInProgress);
} else {
removeTagPropagation(classificationVertex);
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 02f86866f..3b7f4a510 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -792,6 +792,7 @@ public class EntityGraphMapper {
List<AtlasVertex>
entitiesToPropagateTo = null;
Map<AtlasClassification, HashSet<AtlasVertex>>
addedClassifications = new HashMap<>();
List<AtlasClassification> addClassifications
= new ArrayList<>(classifications.size());
+ boolean isImportInProgress
= RequestContext.get().isImportInProgress();
for (AtlasClassification c : classifications) {
AtlasClassification classification = new
AtlasClassification(c);
@@ -851,7 +852,7 @@ public class EntityGraphMapper {
if (propagateTags && taskManagement != null &&
deferredActionEnabled) {
propagateTags = false;
- createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD,
entityVertex, classificationVertex.getIdForDisplay(), classificationName);
+ createAndQueueTask(CLASSIFICATION_PROPAGATION_ADD,
entityVertex, classificationVertex.getIdForDisplay(), classificationName,
isImportInProgress);
}
// add the attributes for the trait instance
@@ -981,6 +982,7 @@ public class EntityGraphMapper {
}
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(this.graph,
entityGuid);
+ boolean isImportInProgress = RequestContext.get().isImportInProgress();
if (entityVertex == null) {
throw new
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, entityGuid);
@@ -1023,7 +1025,7 @@ public class EntityGraphMapper {
throw new
AtlasBaseException(AtlasErrorCode.DELETE_TAG_PROPAGATION_NOT_ALLOWED,
classificationVertexId, entityGuid);
}
- createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE,
entityVertex, classificationVertexId, classificationName);
+ createAndQueueTask(CLASSIFICATION_PROPAGATION_DELETE,
entityVertex, classificationVertexId, classificationName, isImportInProgress);
entityVertices = new ArrayList<>();
} else {
@@ -1101,6 +1103,7 @@ public class EntityGraphMapper {
}
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(this.graph,
guid);
+ boolean isImportInProgress = RequestContext.get().isImportInProgress();
if (entityVertex == null) {
throw new
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
@@ -1215,7 +1218,7 @@ public class EntityGraphMapper {
if (updatedTagPropagation != null && taskManagement != null &&
deferredActionEnabled) {
String propagationType = updatedTagPropagation ?
CLASSIFICATION_PROPAGATION_ADD : CLASSIFICATION_PROPAGATION_DELETE;
- createAndQueueTask(propagationType, entityVertex,
classificationVertex.getIdForDisplay(), classificationName);
+ createAndQueueTask(propagationType, entityVertex,
classificationVertex.getIdForDisplay(), classificationName, isImportInProgress);
updatedTagPropagation = null;
}
@@ -2834,7 +2837,7 @@ public class EntityGraphMapper {
attributes.put(bmAttribute.getName(), attrValue);
}
- private void createAndQueueTask(String taskType, AtlasVertex entityVertex,
String classificationVertexId, String classificationName) {
- deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex,
classificationVertexId, null, classificationName);
+ private void createAndQueueTask(String taskType, AtlasVertex entityVertex,
String classificationVertexId, String classificationName, boolean
isImportInProgress) {
+ deleteDelegate.getHandler().createAndQueueTask(taskType, entityVertex,
classificationVertexId, null, classificationName, isImportInProgress);
}
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 765aa7ddc..1043589b1 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -18,7 +18,6 @@
package org.apache.atlas.repository.store.graph.v2;
import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
@@ -164,7 +163,6 @@ public class EntityGraphRetriever {
private static final String GLOSSARY_CATEGORY_HIERARCHY_EDGE_LABEL =
"r:AtlasGlossaryCategoryHierarchyLink";
private static final String GLOSSARY_CATEGORY_TYPE_NAME =
AtlasGlossaryCategory.class.getSimpleName();
private static final String PARENT_GLOSSARY_CATEGORY_GUID =
"parentCategoryGuid";
- private boolean deferredActionEnabled =
AtlasConfiguration.TASKS_USE_ENABLED.getBoolean();
private static final TypeReference<List<TimeBoundary>>
TIME_BOUNDARIES_LIST_TYPE = new TypeReference<List<TimeBoundary>>() {};
@@ -701,7 +699,7 @@ public class EntityGraphRetriever {
Iterable<AtlasEdge> propagationEdges =
entityVertex.getEdges(AtlasEdgeDirection.BOTH, tagPropagationEdges);
for (AtlasEdge propagationEdge : propagationEdges) {
- if (getEdgeStatus(propagationEdge) != ACTIVE &&
!deferredActionEnabled && !RequestContext.get().isImportInProgress()) {
+ if (getEdgeStatus(propagationEdge) != ACTIVE &&
!RequestContext.get().isImportInProgress()) {
continue;
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
index a614a7a84..1822581d4 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/ClassificationTask.java
@@ -50,6 +50,8 @@ public abstract class ClassificationTask extends AbstractTask
{
public static final String PARAM_RELATIONSHIP_GUID =
"relationshipGuid";
public static final String PARAM_RELATIONSHIP_OBJECT =
"relationshipObject";
public static final String PARAM_RELATIONSHIP_EDGE_ID =
"relationshipEdgeId";
+ public static final String PARAM_IMPORT_IN_PROGRESS =
"isImportInProgress";
+ public static final boolean DEFAULT_IMPORT_IN_PROGRESS = false;
protected final AtlasGraph graph;
protected final EntityGraphMapper entityGraphMapper;
@@ -66,12 +68,17 @@ public abstract class ClassificationTask extends
AbstractTask {
}
public static Map<String, Object> toParameters(String entityGuid, String
classificationVertexId, String relationshipGuid, String classificationName) {
+ return toParameters(entityGuid, classificationVertexId,
relationshipGuid, classificationName, false);
+ }
+
+ public static Map<String, Object> toParameters(String entityGuid, String
classificationVertexId, String relationshipGuid, String classificationName,
boolean isImportInProgress) {
Map<String, Object> ret = new HashMap<>();
ret.put(PARAM_ENTITY_GUID, entityGuid);
ret.put(PARAM_CLASSIFICATION_VERTEX_ID, classificationVertexId);
ret.put(PARAM_CLASSIFICATION_NAME, classificationName);
ret.put(PARAM_RELATIONSHIP_GUID, relationshipGuid);
+ ret.put(PARAM_IMPORT_IN_PROGRESS, isImportInProgress);
return ret;
}
@@ -119,6 +126,14 @@ public abstract class ClassificationTask extends
AbstractTask {
return FAILED;
}
+ Object obj = params.get(PARAM_IMPORT_IN_PROGRESS);
+ if (obj != null) {
+ LOG.debug("Task: {}: Setting import progress set to: {}",
getTaskGuid(), obj);
+ RequestContext.get().setImportInProgress((Boolean) obj);
+ }else {
+
RequestContext.get().setImportInProgress(DEFAULT_IMPORT_IN_PROGRESS);
+ }
+
RequestContext.get().setUser(userName, null);
try {
diff --git
a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
index 702171cc3..2eaf6d57d 100644
---
a/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
+++
b/repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java
@@ -51,6 +51,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static
org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
+import static
org.apache.atlas.repository.graph.GraphHelper.getPropagatedTraitNames;
import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
import static org.apache.atlas.utils.TestLoadModelUtils.loadModelFromJson;
import static org.testng.Assert.assertEquals;
@@ -63,8 +65,14 @@ import static org.testng.Assert.assertTrue;
public class ClassificationPropagationWithTasksTest extends AtlasTestBase {
private static final String IMPORT_FILE = "tag-propagation-data.zip";
+ private static final String IMPORT_DELETE_FILE =
"deleted_tab_propagation.zip";
+
private static final String HDFS_PATH_EMPLOYEES =
"a3955120-ac17-426f-a4af-972ec8690e5f";
+ private static final String HIVE_TABLE =
"089c1ad4-9dde-4f9e-80c8-12a3046be337";
+
+ private static final String HIVE_TABLE_CTAS =
"e83551b7-bbef-45aa-99d5-3d98c0ac737b";
+
@Inject
private AtlasTypeDefStore typeDefStore;
@@ -211,6 +219,28 @@ public class ClassificationPropagationWithTasksTest
extends AtlasTestBase {
assertNotNull(impactedEntities);
}
+ @Test
+ public void runImportForDeletedEntityLineage() throws Exception {
+ runImportWithNoParameters(importService,
getZipSource(IMPORT_DELETE_FILE));
+ final String tagName = "classification1";
+
+ AtlasEntity hiveTable = getEntity(HIVE_TABLE);
+ AtlasEntity hiveTableCtas = getEntity(HIVE_TABLE_CTAS);
+
+ AtlasVertex parentEntityVertex =
AtlasGraphUtilsV2.findByGuid(hiveTable.getGuid());
+
+ AtlasVertex entityVertex =
AtlasGraphUtilsV2.findByGuid(hiveTableCtas.getGuid());
+
+ AtlasVertex classificationVertex =
getClassificationVertex(parentEntityVertex, tagName);
+ assertNotNull(entityVertex);
+ assertNotNull(parentEntityVertex);
+ assertNotNull(classificationVertex);
+
+ List<String> propagatedTraitNames =
getPropagatedTraitNames(entityVertex);
+
+ assertNotNull(propagatedTraitNames);
+ }
+
private void loadModelFilesAndImportTestData() {
try {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore,
typeRegistry);
diff --git a/repository/src/test/resources/deleted_tab_propagation.zip
b/repository/src/test/resources/deleted_tab_propagation.zip
new file mode 100644
index 000000000..ace129350
Binary files /dev/null and
b/repository/src/test/resources/deleted_tab_propagation.zip differ