This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch atlas-2.5 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit c90cb8b0de2abdc205a877b172cde9ceacb7571c Author: achandel-01 <[email protected]> AuthorDate: Mon Sep 22 16:38:07 2025 +0530 ATLAS-4920: Atlas Auto Purge Feature (#406) resolved comments comments resolved resolved comments updated UT's for PurgeService fixed ut's names fixed ut's names again updated uts updated uts,ui,function names check style changes Co-authored-by: Abhinav Chandel <[email protected]> (cherry picked from commit ca4e036de90dd02a0c62b89425f2e71f2f79e615) --- ...-base_model_add_atlas_operation_attributes.json | 19 + dashboardv2/public/js/utils/Enums.js | 3 +- .../js/views/audit/AdminAuditTableLayoutView.js | 10 +- dashboardv3/public/js/utils/Enums.js | 3 +- .../js/views/audit/AdminAuditTableLayoutView.js | 10 +- .../repository/graphdb/janus/AtlasJanusGraph.java | 1 - .../apache/atlas/model/audit/AtlasAuditEntry.java | 1 + .../repository/audit/EntityAuditListenerV2.java | 2 +- .../repository/store/graph/AtlasEntityStore.java | 8 + .../repository/store/graph/v1/DeleteHandlerV1.java | 159 +++++- .../store/graph/v2/AtlasEntityStoreV2.java | 47 ++ .../org/apache/atlas/services/PurgeService.java | 537 +++++++++++++++++++++ .../apache/atlas/services/PurgeServiceTest.java | 220 +++++++++ .../apache/atlas/web/resources/AdminResource.java | 70 ++- .../atlas/web/resources/AdminResourceTest.java | 4 +- 15 files changed, 1080 insertions(+), 14 deletions(-) diff --git a/addons/models/0000-Area0/patches/007-base_model_add_atlas_operation_attributes.json b/addons/models/0000-Area0/patches/007-base_model_add_atlas_operation_attributes.json new file mode 100644 index 000000000..5bfe974f5 --- /dev/null +++ b/addons/models/0000-Area0/patches/007-base_model_add_atlas_operation_attributes.json @@ -0,0 +1,19 @@ +{ + "patches": [ + { + "id": "TYPEDEF_PATCH_0007_001", + "description": "Add additional operations in Atlas", + "action": "UPDATE_ENUMDEF", + "typeName": "atlas_operation", + "applyToVersion": "1.1", + "updateToVersion": "1.2", + "params": null, + "elementDefs": [ + { + "ordinal": 10, + "value": "AUTO_PURGE" + } + ] + } + ] +} diff --git a/dashboardv2/public/js/utils/Enums.js b/dashboardv2/public/js/utils/Enums.js index 00bbe103d..fd3760ead 100644 --- a/dashboardv2/public/js/utils/Enums.js +++ b/dashboardv2/public/js/utils/Enums.js @@ -69,7 +69,8 @@ define(["require", "backbone"], function(require) { BUSINESS_METADATA: "Business Metadata", PURGE: "Purge Entities", IMPORT: "Import Entities", - EXPORT: "Export Entities" + EXPORT: "Export Entities", + AUTO_PURGE : "Auto Purged Entities" } Enums.entityStateReadOnly = { diff --git a/dashboardv2/public/js/views/audit/AdminAuditTableLayoutView.js b/dashboardv2/public/js/views/audit/AdminAuditTableLayoutView.js index f7918e82f..cc56e03cf 100644 --- a/dashboardv2/public/js/views/audit/AdminAuditTableLayoutView.js +++ b/dashboardv2/public/js/views/audit/AdminAuditTableLayoutView.js @@ -244,7 +244,7 @@ define(['require', el.attr('colspan', '8'); if (results) { var adminValues = null; - if (operation == "PURGE") { + if (operation == "PURGE" || operation == "AUTO_PURGE") { adminText = that.displayPurgeAndImportAudits(auditData); } else if (operation == "EXPORT" || operation == "IMPORT") { adminText = that.displayExportAudits(auditData); @@ -353,7 +353,7 @@ define(['require', var adminValues = '<ul class="col-sm-6">', guids = null, adminTypDetails = Enums.category[obj.operation]; - if (obj.operation == "PURGE") { + if (obj.operation == "PURGE" || obj.operation == "AUTO_PURGE") { guids = obj.results ? obj.results.replace('[', '').replace(']', '').split(',') : guids; } else { guids = obj.model.get('params') ? obj.model.get('params').split(',') : guids; @@ -428,9 +428,13 @@ define(['require', onClickAdminPurgedEntity: function(e) { var that = this; require(['views/audit/AuditTableLayoutView'], function(AuditTableLayoutView) { + const titles = { + PURGE: "Purged Entity Details", + AUTO_PURGE: "Auto Purge Entity Details" + }; var obj = { guid: $(e.target).text(), - titleText: (e.target.dataset.operation == "PURGE") ? "Purged Entity Details: " : "Import Details: " + titleText: (titles[e.target.dataset.operation] || "Import Details") + ": " }, modalData = { title: obj.titleText + obj.guid, diff --git a/dashboardv3/public/js/utils/Enums.js b/dashboardv3/public/js/utils/Enums.js index 00bbe103d..fd3760ead 100644 --- a/dashboardv3/public/js/utils/Enums.js +++ b/dashboardv3/public/js/utils/Enums.js @@ -69,7 +69,8 @@ define(["require", "backbone"], function(require) { BUSINESS_METADATA: "Business Metadata", PURGE: "Purge Entities", IMPORT: "Import Entities", - EXPORT: "Export Entities" + EXPORT: "Export Entities", + AUTO_PURGE : "Auto Purged Entities" } Enums.entityStateReadOnly = { diff --git a/dashboardv3/public/js/views/audit/AdminAuditTableLayoutView.js b/dashboardv3/public/js/views/audit/AdminAuditTableLayoutView.js index f7918e82f..cc56e03cf 100644 --- a/dashboardv3/public/js/views/audit/AdminAuditTableLayoutView.js +++ b/dashboardv3/public/js/views/audit/AdminAuditTableLayoutView.js @@ -244,7 +244,7 @@ define(['require', el.attr('colspan', '8'); if (results) { var adminValues = null; - if (operation == "PURGE") { + if (operation == "PURGE" || operation == "AUTO_PURGE") { adminText = that.displayPurgeAndImportAudits(auditData); } else if (operation == "EXPORT" || operation == "IMPORT") { adminText = that.displayExportAudits(auditData); @@ -353,7 +353,7 @@ define(['require', var adminValues = '<ul class="col-sm-6">', guids = null, adminTypDetails = Enums.category[obj.operation]; - if (obj.operation == "PURGE") { + if (obj.operation == "PURGE" || obj.operation == "AUTO_PURGE") { guids = obj.results ? obj.results.replace('[', '').replace(']', '').split(',') : guids; } else { guids = obj.model.get('params') ? obj.model.get('params').split(',') : guids; @@ -428,9 +428,13 @@ define(['require', onClickAdminPurgedEntity: function(e) { var that = this; require(['views/audit/AuditTableLayoutView'], function(AuditTableLayoutView) { + const titles = { + PURGE: "Purged Entity Details", + AUTO_PURGE: "Auto Purge Entity Details" + }; var obj = { guid: $(e.target).text(), - titleText: (e.target.dataset.operation == "PURGE") ? "Purged Entity Details: " : "Import Details: " + titleText: (titles[e.target.dataset.operation] || "Import Details") + ": " }, modalData = { title: obj.titleText + obj.guid, diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java index e5016eb70..e0d4857a1 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java @@ -446,7 +446,6 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE return uniqueKeyHandler; } - public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> wrapVertices(Iterable<? extends Vertex> it) { return Iterables.transform(it, (Function<Vertex, AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>>) input -> GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input)); } diff --git a/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java b/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java index a103a1730..722c1a52e 100644 --- a/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java +++ b/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java @@ -136,6 +136,7 @@ public class AtlasAuditEntry extends AtlasBaseModelObject implements Serializabl public enum AuditOperation { PURGE("PURGE"), + AUTO_PURGE("AUTO_PURGE"), EXPORT("EXPORT"), IMPORT("IMPORT"), IMPORT_DELETE_REPL("IMPORT_DELETE_REPL"), diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java index f0ac8a9ec..82a02200d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java @@ -407,7 +407,7 @@ public class EntityAuditListenerV2 implements EntityChangeListenerV2 { private EntityAuditEventV2 createEvent(EntityAuditEventV2 entityAuditEventV2, AtlasEntity entity, EntityAuditActionV2 action, String details) { entityAuditEventV2.setEntityId(entity.getGuid()); entityAuditEventV2.setTimestamp(System.currentTimeMillis()); - entityAuditEventV2.setUser(RequestContext.get().getUser()); + entityAuditEventV2.setUser(RequestContext.get().getUser() != null ? RequestContext.get().getUser() : "admin"); entityAuditEventV2.setAction(action); entityAuditEventV2.setDetails(details); entityAuditEventV2.setEntity(entity); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java index 457f86575..6f13175ce 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java @@ -28,6 +28,7 @@ import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntityHeaders; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.store.graph.v2.EntityStream; import org.apache.atlas.type.AtlasEntityType; @@ -226,6 +227,13 @@ public interface AtlasEntityStore { */ EntityMutationResponse purgeByIds(Set<String> guids) throws AtlasBaseException; + /* + * Returns set of auto-purged entity guids + */ + EntityMutationResponse purgeEntitiesInBatch(Set<String> deletedVertices) throws AtlasBaseException; + + Set<AtlasVertex> accumulateDeletionCandidates(Set<String> vertices) throws AtlasBaseException; + /** * Add classification(s) */ diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java index 24c80c0a1..5d83f2934 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java @@ -63,6 +63,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -77,18 +78,22 @@ import static org.apache.atlas.model.TypeCategory.STRUCT; import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; import static org.apache.atlas.model.instance.AtlasEntity.Status.PURGED; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_DATASET; +import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_PROCESS; import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO; import static org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_STATUS; import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL; import static org.apache.atlas.repository.Constants.CLASSIFICATION_NAME_DELIMITER; import static org.apache.atlas.repository.Constants.EDGE_PENDING_TASKS_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY; import static org.apache.atlas.repository.Constants.PROPAGATED_CLASSIFICATION_NAMES_KEY; import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY; import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY; import static org.apache.atlas.repository.graph.GraphHelper.getAllClassificationEdges; import static org.apache.atlas.repository.graph.GraphHelper.getAssociatedEntityVertex; import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds; @@ -127,6 +132,8 @@ public abstract class DeleteHandlerV1 { public static final Logger LOG = LoggerFactory.getLogger(DeleteHandlerV1.class); private static final boolean DEFERRED_ACTION_ENABLED = AtlasConfiguration.TASKS_USE_ENABLED.getBoolean(); + private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; + private static final String RELATIONSHIP_ATTRIBUTE_KEY_STRING = "columnLineages"; protected final GraphHelper graphHelper; @@ -156,12 +163,22 @@ public abstract class DeleteHandlerV1 { * @throws AtlasBaseException */ public void deleteEntities(Collection<AtlasVertex> instanceVertices) throws AtlasBaseException { + Set<AtlasVertex> deletionCandidateVertices = accumulateDeletionCandidates(instanceVertices); + deleteTraitsAndVertices(deletionCandidateVertices); + } + + /* + accumulate the deletion candidates + */ + public Set<AtlasVertex> accumulateDeletionCandidates(Collection<AtlasVertex> instanceVertices) throws AtlasBaseException { final RequestContext requestContext = RequestContext.get(); final Set<AtlasVertex> deletionCandidateVertices = new HashSet<>(); final boolean isPurgeRequested = requestContext.isPurgeRequested(); + final Set<String> instanceVertexGuids = new HashSet<>(); for (AtlasVertex instanceVertex : instanceVertices) { final String guid = AtlasGraphUtilsV2.getIdFromVertex(instanceVertex); + instanceVertexGuids.add(guid); if (skipVertexForDelete(instanceVertex)) { if (LOG.isDebugEnabled()) { @@ -186,15 +203,155 @@ public abstract class DeleteHandlerV1 { requestContext.recordEntityDelete(entityHeader); deletionCandidateVertices.add(vertexInfo.getVertex()); } + + AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(instanceVertex); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + if (entityType.getEntityDef().hasSuperType(ATLAS_TYPE_DATASET)) { + addUpstreamProcessEntities(instanceVertex, deletionCandidateVertices, instanceVertexGuids); + } + + if (entityType.getEntityDef().hasSuperType(ATLAS_TYPE_PROCESS)) { + getColumnLineageEntities(instanceVertex, deletionCandidateVertices); + } } + return deletionCandidateVertices; + } - // Delete traits and vertices. + /* + actually delete traits and then the vertex along its references + */ + public void deleteTraitsAndVertices(Collection<AtlasVertex> deletionCandidateVertices) throws AtlasBaseException { for (AtlasVertex deletionCandidateVertex : deletionCandidateVertices) { deleteAllClassifications(deletionCandidateVertex); deleteTypeVertex(deletionCandidateVertex, isInternalType(deletionCandidateVertex)); } } + public void addUpstreamProcessEntities(AtlasVertex entityVertex, Set<AtlasVertex> deletionCandidateVertices, Set<String> instanceVertexGuids) throws AtlasBaseException { + RequestContext requestContext = RequestContext.get(); + + Iterator<AtlasEdge> edgeIterator = GraphHelper.getIncomingEdgesByLabel(entityVertex, PROCESS_OUTPUTS_EDGE); + + String entityVertexGuid = entityVertex.getProperty(GUID_PROPERTY_KEY, String.class); + + while (edgeIterator.hasNext()) { + AtlasEdge edge = edgeIterator.next(); + AtlasVertex processVertex = edge.getOutVertex(); + + String guid = processVertex.getProperty(GUID_PROPERTY_KEY, String.class); + if (instanceVertexGuids.contains(guid)) { + return; // already added + } + + boolean isEligible = isEligible(processVertex, entityVertexGuid, instanceVertexGuids); + + if (isEligible) { + instanceVertexGuids.add(guid); + + getColumnLineageEntities(processVertex, deletionCandidateVertices); + + for (GraphHelper.VertexInfo vertexInfo : getOwnedVertices(processVertex)) { + AtlasEntityHeader entityHeader = vertexInfo.getEntity(); + + if (requestContext.isPurgeRequested()) { + entityHeader.setClassifications(entityRetriever.getAllClassifications(vertexInfo.getVertex())); + } + + requestContext.recordEntityDelete(entityHeader); + deletionCandidateVertices.add(vertexInfo.getVertex()); + } + } + } + } + + public void getColumnLineageEntities(AtlasVertex process, Set<AtlasVertex> deletionCandidateVertices) throws AtlasBaseException { + RequestContext requestContext = RequestContext.get(); + + AtlasEntityHeader entity = entityRetriever.toAtlasEntityHeader(process); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + Map<String, Map<String, AtlasAttribute>> relationshipAttributes = entityType.getRelationshipAttributes(); + Map<String, AtlasAttribute> columnLineages = relationshipAttributes.get(RELATIONSHIP_ATTRIBUTE_KEY_STRING); + + if (columnLineages != null && !columnLineages.isEmpty()) { + AtlasAttribute atlasAttribute = columnLineages.values().iterator().next(); + String relationshipEdgeLabel = atlasAttribute.getRelationshipEdgeLabel(); + + Iterator<AtlasEdge> edgeIterator = GraphHelper.getIncomingEdgesByLabel(process, relationshipEdgeLabel); + + int addedCount = 0; + + while (edgeIterator.hasNext()) { + AtlasVertex columnLineageVertex = edgeIterator.next().getOutVertex(); + String typeName = columnLineageVertex.getProperty(TYPE_NAME_PROPERTY_KEY, String.class); + + if (LOG.isDebugEnabled()) { + LOG.debug("Column-lineage candidate: type={}, guid={}, state={}, viaEdgeLabel={}, process={}", + typeName, getGuid(columnLineageVertex), getState(columnLineageVertex), relationshipEdgeLabel, string(process)); + } + + AtlasEntityHeader columnLineageEntityHeader = entityRetriever.toAtlasEntityHeader(columnLineageVertex); + requestContext.recordEntityDelete(columnLineageEntityHeader); + deletionCandidateVertices.add(columnLineageVertex); + addedCount++; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Collected {} column-lineage vertices via '{}' for process {}", addedCount, relationshipEdgeLabel, getGuid(process)); + } + } + } + + boolean isEligible(AtlasVertex processVertex, String entityVertexGuid, Set<String> instanceVertexGuids) throws AtlasBaseException { + RequestContext requestContext = RequestContext.get(); + + if (requestContext.isPurgeRequested() && getState(processVertex) == ACTIVE) { + // skipping the active process entity purging back ward compatibility + return false; + } + + Iterator<AtlasEdge> processEdges = GraphHelper.getOutGoingEdgesByLabel(processVertex, PROCESS_OUTPUTS_EDGE); + + long countDeletedOutputs = 0; + long instanceDeleted = 0; // for handling the case of deletion + + while (processEdges.hasNext()) { + //process vertex can have the deleted outgoing edges apart from the current data_set entity + AtlasVertex outputEntity = processEdges.next().getInVertex(); + String outputEntityGuid = outputEntity.getProperty(GUID_PROPERTY_KEY, String.class); // output guid + + if (getState(outputEntity) == ACTIVE && !entityVertexGuid.equals(outputEntityGuid)) { + return false; + } + countDeletedOutputs++; + + if (requestContext.isPurgeRequested() && instanceVertexGuids.contains(outputEntityGuid)) { + instanceDeleted++; //for checking that if all outputs are in the instanceVertexGuids during purge + } + } + + if (requestContext.isPurgeRequested() && countDeletedOutputs > 1) { // ensuring process purging along only left deleted output + // but can skip process entity if the all datasets are to be purged in a single scheduled job + return countDeletedOutputs == instanceDeleted; + } + + return true; + } + + public static boolean isSoftDeletableProcess(AtlasVertex processVertex) { + Iterator<AtlasEdge> processEdges = GraphHelper.getOutGoingEdgesByLabel(processVertex, PROCESS_OUTPUTS_EDGE); + + while (processEdges.hasNext()) { + AtlasVertex output = processEdges.next().getInVertex(); + + if (getState(output) == ACTIVE) { + return false; + } + } + + return true; + } + /** * Delete the specified relationship edge. * diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index cdfa8b542..3dd45f98f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -577,6 +577,53 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { return ret; } + @Override + @GraphTransaction + public EntityMutationResponse purgeEntitiesInBatch(Set<String> purgeCandidates) throws AtlasBaseException { + LOG.info("==> purgeEntitiesInBatch()"); + + Collection<AtlasVertex> purgeVertices = new ArrayList<>(); + EntityMutationResponse response = new EntityMutationResponse(); + + RequestContext requestContext = RequestContext.get(); + requestContext.setDeleteType(DeleteType.HARD); // hard deleter + requestContext.setPurgeRequested(true); + + for (String guid : purgeCandidates) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(graph, guid); + if (vertex != null) { + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeader(vertex); + purgeVertices.add(vertex); + response.addEntity(PURGE, entityHeader); + } + } + + deleteDelegate.getHandler().deleteTraitsAndVertices(purgeVertices); + + entityChangeNotifier.onEntitiesMutated(response, false); + + for (AtlasEntityHeader entity : response.getPurgedEntities()) { + LOG.info("Auto purged entity with guid {}", entity.getGuid()); + } + + LOG.info("<== purgeEntitiesInBatch()"); + + return response; + } + + @Override + public Set<AtlasVertex> accumulateDeletionCandidates(Set<String> guids) throws AtlasBaseException { + LOG.info("==> accumulateDeletionCandidates() !"); + Set<AtlasVertex> vertices = new HashSet<>(); + + for (String guid : guids) { + AtlasVertex vertex = entityRetriever.getEntityVertex(guid); + vertices.add(vertex); + } + + return deleteDelegate.getHandler().accumulateDeletionCandidates(vertices); + } + @Override @GraphTransaction public void addClassifications(final String guid, final List<AtlasClassification> classifications) throws AtlasBaseException { diff --git a/repository/src/main/java/org/apache/atlas/services/PurgeService.java b/repository/src/main/java/org/apache/atlas/services/PurgeService.java new file mode 100644 index 000000000..8f5de1f72 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/services/PurgeService.java @@ -0,0 +1,537 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.services; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.DeleteType; +import org.apache.atlas.RequestContext; +import org.apache.atlas.annotation.AtlasService; +import org.apache.atlas.annotation.Timed; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.pc.WorkItemBuilder; +import org.apache.atlas.pc.WorkItemConsumer; +import org.apache.atlas.pc.WorkItemManager; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v1.DeleteHandlerV1; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.service.Service; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.stream.Collectors; + +import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.PURGE; +import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.VERTEX_INDEX; + +@AtlasService +@Order(9) +@Component +public class PurgeService implements Service { + private static final Logger LOG = LoggerFactory.getLogger(PurgeService.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("service.Purge"); + private final AtlasGraph atlasGraph; + private static Configuration atlasProperties; + private final AtlasEntityStore entityStore; + private final AtlasTypeRegistry typeRegistry; + + private static final String ENABLE_PROCESS_SOFT_DELETION = "atlas.enable.process.soft.delete"; + private static final boolean ENABLE_PROCESS_SOFT_DELETION_DEFAULT = false; + private static final String PURGE_ENABLED_SERVICE_TYPES = "atlas.purge.enabled.services"; + private static final String SOFT_DELETE_ENABLED_PROCESS_TYPES = "atlas.soft.delete.enabled.process.types"; + private static final String PURGE_BATCH_SIZE = "atlas.purge.batch.size"; + private static final int DEFAULT_PURGE_BATCH_SIZE = 1000; // fetching limit at a time + private static final String PURGE_WORKER_BATCH_SIZE = "atlas.purge.worker.batch.size"; + private static final int DEFAULT_PURGE_WORKER_BATCH_SIZE = 100; + private static final String CLEANUP_WORKER_BATCH_SIZE = "atlas.cleanup.worker.batch.size"; + private static final int DEFAULT_CLEANUP_WORKER_BATCH_SIZE = 100; + private static final String PURGE_RETENTION_PERIOD = "atlas.purge.deleted.entity.retention.days"; + private static final int PURGE_RETENTION_PERIOD_DEFAULT = 30; // days + private static final String PURGE_WORKERS_COUNT = "atlas.purge.workers.count"; + private static final int DEFAULT_PURGE_WORKERS_COUNT = 2; + private static final String CLEANUP_WORKERS_COUNT = "atlas.cleanup.workers.count"; + private static final int DEFAULT_CLEANUP_WORKERS_COUNT = 2; + private static final String PROCESS_ENTITY_CLEANER_THREAD_NAME = "Process-Entity-Cleaner"; + private final String indexSearchPrefix = AtlasGraphUtilsV2.getIndexSearchPrefix(); + private static final int DEFAULT_CLEANUP_BATCH_SIZE = 1000; + private static final String CLEANUP_WORKERS_NAME = "Process-Cleanup-Worker"; + private static final String PURGE_WORKERS_NAME = "Entity-Purge-Worker"; + private static final String DELETED = "DELETED"; + private static final String ACTIVE = "ACTIVE"; + private static final String AND_STR = " AND "; + + static { + try { + atlasProperties = ApplicationProperties.get(); + } catch (Exception e) { + LOG.info("Failed to load application properties", e); + } + } + + @Inject + public PurgeService(AtlasGraph atlasgraph, AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { + this.atlasGraph = atlasgraph; + this.entityStore = entityStore; + this.typeRegistry = typeRegistry; + } + + @Override + public void start() throws AtlasException { + if (!getSoftDeletionFlag()) { + LOG.info("==> cleanup not enabled"); + return; + } + + LOG.info("==> PurgeService.start()"); + + launchCleanUp(); + + LOG.info("<== Launched the clean up thread"); + } + + @Override + public void stop() throws AtlasException { + LOG.info("==> stopping the purge service"); + } + + public void launchCleanUp() { + LOG.info("==> launching the new thread"); + + Thread thread = new Thread( + () -> { + long startTime = System.currentTimeMillis(); + LOG.info("==> {} started", PROCESS_ENTITY_CLEANER_THREAD_NAME); + softDeleteProcessEntities(); + LOG.info("==> exiting thread {}", PROCESS_ENTITY_CLEANER_THREAD_NAME); + long endTime = System.currentTimeMillis(); + LOG.info("==> completed cleanup {} seconds !", (endTime - startTime) / 1000); + }); + + thread.setName(PROCESS_ENTITY_CLEANER_THREAD_NAME); + thread.start(); + LOG.info("==> launched the thread for the clean up"); + } + + @SuppressWarnings("unchecked") + @Timed + public EntityMutationResponse purgeEntities() { + LOG.info("==> PurgeService.purgeEntities()"); + // index query of specific batch size + AtlasPerfTracer perf = null; + EntityMutationResponse entityMutationResponse = new EntityMutationResponse(); + RequestContext requestContext = RequestContext.get(); + requestContext.setDeleteType(DeleteType.HARD); // hard delete + requestContext.setPurgeRequested(true); + + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "PurgeService.purgeEntities"); + } + + Set<String> allEligibleTypes = getEntityTypes(); + + try { + //bring n number of entities like 1000 at point of type Processes + WorkItemsQualifier wiq = createQualifier(typeRegistry, entityStore, atlasGraph, getPurgeWorkerBatchSize(), getPurgeWorkersCount(), true); + + String indexQuery = getBulkQueryString(allEligibleTypes, getPurgeRetentionPeriod()); + Iterator<Result> itr = atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertices(0, getPurgeBatchSize()); + LOG.info("==> fetched Deleted entities"); + + if (!itr.hasNext()) { + LOG.info("==> no Purge Entities found"); + return entityMutationResponse; + } + + Set<String> producedDeletionCandidates = new HashSet<>(); // look up + + while (itr.hasNext()) { + AtlasVertex vertex = itr.next().getVertex(); + + if (vertex == null) { + continue; + } + + String guid = vertex.getProperty(GUID_PROPERTY_KEY, String.class); + + if (!producedDeletionCandidates.contains(guid)) { + Set<String> instanceVertex = new HashSet<>(); + instanceVertex.add(guid); + + Set<AtlasVertex> deletionCandidates = entityStore.accumulateDeletionCandidates(instanceVertex); + + for (AtlasVertex deletionCandidate : deletionCandidates) { + String deletionCandidateGuid = deletionCandidate.getProperty(GUID_PROPERTY_KEY, String.class); + if (!producedDeletionCandidates.contains(deletionCandidateGuid)) { + producedDeletionCandidates.add(deletionCandidateGuid); + wiq.checkProduce(deletionCandidate); + } + } + } + } + + wiq.shutdown(); + + // collecting all the results + Queue results = wiq.getResults(); + + LOG.info("==> Purged {} !", results.size()); + + while (!results.isEmpty()) { + AtlasEntityHeader entityHeader = (AtlasEntityHeader) results.poll(); + if (entityHeader == null) { + continue; + } + entityMutationResponse.addEntity(PURGE, entityHeader); + } + } catch (Exception ex) { + LOG.error("purge: failed!", ex); + } finally { + LOG.info("purge: Done!"); + } + } finally { + AtlasPerfTracer.log(perf); + } + + LOG.info("<== PurgeService.purgeEntities()"); + + return entityMutationResponse; + } + + @SuppressWarnings("unchecked") + @Timed + public void softDeleteProcessEntities() { + LOG.info("==> softDeleteProcessEntities()"); + + AtlasPerfTracer perf = null; + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "PurgeService.softDeleteProcessEntities"); + } + + Set<String> validProcessTypes = getProcessTypes(); + try { + //bring n number of entities like 1000 at point of type Processes + WorkItemsQualifier wiq = createQualifier(typeRegistry, entityStore, atlasGraph, getCleanupWorkerBatchSize(), getCleanUpWorkersCount(), false); + int offset = 0; + boolean moreResults = true; + + while (moreResults) { + String indexQuery = getBulkQueryString(validProcessTypes, 0); + Iterator<Result> itr = atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertices(offset, DEFAULT_CLEANUP_BATCH_SIZE); + LOG.info("==> fetched entities"); + + if (!itr.hasNext()) { + moreResults = false; + } + + while (itr.hasNext()) { + AtlasVertex vertex = itr.next().getVertex(); + if (vertex != null) { + wiq.checkProduce(vertex); + } + } + + offset += DEFAULT_CLEANUP_BATCH_SIZE; + LOG.info("==> offset {}", offset); + } + + wiq.shutdown(); + } catch (Exception ex) { + LOG.error("cleanUp: failed!", ex); + } finally { + LOG.info("cleanUp: Done!"); + } + } finally { + AtlasPerfTracer.log(perf); + } + + LOG.info("<== softDeleteProcessEntities()"); + } + + static class EntityQualifier extends WorkItemConsumer<AtlasVertex> { + private final Set<String> batch = new HashSet<>(); + private final AtlasEntityStore entityStore; + private final AtlasTypeRegistry typeRegistry; + private final AtlasGraph atlasGraph; + private final boolean isPurgeEnabled; + private int batchesProcessed; + private int batchSize; + + public EntityQualifier(BlockingQueue<AtlasVertex> queue, AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, AtlasGraph atlasGraph, boolean isPurgeEnabled, int batchSize) { + super(queue); + this.typeRegistry = typeRegistry; + this.entityStore = entityStore; + this.atlasGraph = atlasGraph; + this.isPurgeEnabled = isPurgeEnabled; + this.batchesProcessed = 0; + this.batchSize = batchSize; + + if (isPurgeEnabled) { + LOG.info("==> consumers are purge enabled , batch size is {}", batchSize); + } else { + LOG.info("==> consumers are soft delete enabled , batch size is {}", batchSize); + } + } + + @Override + protected void processItem(AtlasVertex vertex) { + String guid = vertex.getProperty(GUID_PROPERTY_KEY, String.class); + LOG.info("==> processing the entity {}", guid); + + try { + if (!isPurgeEnabled && !isEligible(vertex)) { + return; + } + batch.add(guid); + commit(); + } catch (Exception ex) { + LOG.info("{}", ex.getMessage()); + } + } + + @Override + protected void doCommit() { + if (batch.size() == batchSize) { + attemptCommit(); + } + } + + @Override + protected void commitDirty() { + if (!batch.isEmpty()) { + attemptCommit(); + } + + super.commitDirty(); + } + + protected void attemptCommit() { + EntityMutationResponse res; + List<AtlasEntityHeader> results = Collections.emptyList(); + + try { + if (isPurgeEnabled) { + // purging not by directly + res = entityStore.purgeEntitiesInBatch(batch); + } else { + List<String> batchList = new ArrayList<>(batch); + res = entityStore.deleteByIds(batchList); + } + + results = isPurgeEnabled ? res.getPurgedEntities() : res.getDeletedEntities(); + + if (CollectionUtils.isEmpty(results)) { + return; + } + + for (AtlasEntityHeader entityHeader : results) { + addResult(entityHeader); // adding results + } + } catch (Exception e) { + LOG.info("==> Exception: {}", e.getMessage()); + } finally { + batchesProcessed++; + batch.clear(); + LOG.info("==> Processed {} batch number with total {} entities purged!", batchesProcessed, results.size()); + } + } + } + + static class EntityQualifierBuilder implements WorkItemBuilder<EntityQualifier, AtlasVertex> { + private final AtlasTypeRegistry typeRegistry; + private final AtlasEntityStore entityStore; + private final AtlasGraph atlasGraph; + private final boolean isPurgeEnabled; + private int batchSize; + + public EntityQualifierBuilder(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, AtlasGraph atlasGraph, boolean isPurgeEnabled, int batchSize) { + this.typeRegistry = typeRegistry; + this.entityStore = entityStore; + this.atlasGraph = atlasGraph; + this.isPurgeEnabled = isPurgeEnabled; + this.batchSize = batchSize; + } + + @Override + public EntityQualifier build(BlockingQueue<AtlasVertex> queue) { + return new EntityQualifier(queue, typeRegistry, entityStore, atlasGraph, isPurgeEnabled, batchSize); + } + } + + static class WorkItemsQualifier extends WorkItemManager<AtlasVertex, EntityQualifier> { + public WorkItemsQualifier(WorkItemBuilder builder, int batchSize, int numWorkers, boolean isPurgeEnabled) { + super(builder, isPurgeEnabled ? PURGE_WORKERS_NAME : CLEANUP_WORKERS_NAME, batchSize, numWorkers, true); + } + + @Override + public void shutdown() throws InterruptedException { + LOG.info("==> Shutting down manager!"); + super.shutdown(); + } + } + + public WorkItemsQualifier createQualifier(AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, AtlasGraph atlasGraph, int batchSize, int numWorkers, boolean isPurgeEnabled) { + EntityQualifierBuilder eqb = new EntityQualifierBuilder(typeRegistry, entityStore, atlasGraph, isPurgeEnabled, batchSize); + LOG.info("==> creating the purge entity producer"); + return new WorkItemsQualifier(eqb, batchSize, numWorkers, isPurgeEnabled); + } + + public static boolean isEligible(AtlasVertex vertex) { + return DeleteHandlerV1.isSoftDeletableProcess(vertex); + } + + private String getBulkQueryString(Set<String> typeNames, int retentionPeriod) { + String joinedTypes = typeNames.stream() + .map(t -> "\"" + t + "\"") + .collect(Collectors.joining(" OR ", "(", ")")); + + String indexQuery = getString(retentionPeriod, joinedTypes); + + LOG.info("bulk index query : {}", indexQuery); + return indexQuery; + } + + private long timeThresholdMillis(int retentionPeriod) { + long currentTimeMillis = System.currentTimeMillis(); + long retentionPeriodMillis = retentionPeriod * 24L * 60 * 60 * 1000; // Convert days to ms + return currentTimeMillis - retentionPeriodMillis; + } + + private String getString(int retentionDays, String joinedTypes) { + String baseQuery = indexSearchPrefix + "\"" + ENTITY_TYPE_PROPERTY_KEY + "\": " + joinedTypes + AND_STR + + indexSearchPrefix + "\"" + STATE_PROPERTY_KEY + "\": (%s)"; + + String indexQuery = (retentionDays > 0) + ? String.format(baseQuery + AND_STR + indexSearchPrefix + "\"" + MODIFICATION_TIMESTAMP_PROPERTY_KEY + "\": [* TO %s]", DELETED, timeThresholdMillis(retentionDays)) + : String.format(baseQuery, ACTIVE); + + return indexQuery; + } + + public boolean getSoftDeletionFlag() { + if (atlasProperties != null) { + return atlasProperties.getBoolean(ENABLE_PROCESS_SOFT_DELETION, ENABLE_PROCESS_SOFT_DELETION_DEFAULT); + } + return false; + } + + private int getPurgeRetentionPeriod() { + int retentionPeriod = PURGE_RETENTION_PERIOD_DEFAULT; + + if (atlasProperties != null) { + retentionPeriod = atlasProperties.getInt(PURGE_RETENTION_PERIOD, PURGE_RETENTION_PERIOD_DEFAULT); + } + + return Math.max(PURGE_RETENTION_PERIOD_DEFAULT, retentionPeriod); // for enforcing the minimum retention period of 30 days + } + + private Set<String> getProcessTypes() { + Set<String> processTypes = new HashSet<>(); + + if (atlasProperties != null) { + String[] eligibleTypes = atlasProperties.getStringArray(SOFT_DELETE_ENABLED_PROCESS_TYPES); // e.g. hive, spark + for (String type : eligibleTypes) { + if (typeRegistry.isRegisteredType(type)) { + processTypes.add(type); + } + } + } + + return processTypes; + } + + public Set<String> getEntityTypes() { + Set<String> entityTypes = new HashSet<>(); + + if (atlasProperties != null) { + String[] eligibleServiceTypes = atlasProperties.getStringArray(PURGE_ENABLED_SERVICE_TYPES); // e.g. hive, spark + Set<String> serviceTypes = Arrays.stream(eligibleServiceTypes).collect(Collectors.toSet()); + + for (AtlasEntityDef entityDef : typeRegistry.getAllEntityDefs()) { + if (serviceTypes.contains(entityDef.getServiceType())) { + entityTypes.add(entityDef.getName()); + } + } + } + + return entityTypes; + } + + private int getPurgeBatchSize() { + if (atlasProperties != null) { + return atlasProperties.getInt(PURGE_BATCH_SIZE, DEFAULT_PURGE_BATCH_SIZE); + } + return DEFAULT_PURGE_BATCH_SIZE; + } + + private int getPurgeWorkersCount() { + if (atlasProperties != null) { + return atlasProperties.getInt(PURGE_WORKERS_COUNT, DEFAULT_PURGE_WORKERS_COUNT); + } + return DEFAULT_PURGE_WORKERS_COUNT; + } + + private int getCleanUpWorkersCount() { + if (atlasProperties != null) { + return atlasProperties.getInt(CLEANUP_WORKERS_COUNT, DEFAULT_CLEANUP_WORKERS_COUNT); + } + return DEFAULT_CLEANUP_WORKERS_COUNT; + } + + private int getPurgeWorkerBatchSize() { + if (atlasProperties != null) { + return atlasProperties.getInt(PURGE_WORKER_BATCH_SIZE, DEFAULT_PURGE_WORKER_BATCH_SIZE); + } + return DEFAULT_PURGE_WORKER_BATCH_SIZE; + } + + private int getCleanupWorkerBatchSize() { + if (atlasProperties != null) { + return atlasProperties.getInt(CLEANUP_WORKER_BATCH_SIZE, DEFAULT_CLEANUP_WORKER_BATCH_SIZE); + } + return DEFAULT_CLEANUP_WORKER_BATCH_SIZE; + } +} diff --git a/repository/src/test/java/org/apache/atlas/services/PurgeServiceTest.java b/repository/src/test/java/org/apache/atlas/services/PurgeServiceTest.java new file mode 100644 index 000000000..0f4e0482c --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/services/PurgeServiceTest.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.services; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.DeleteType; +import org.apache.atlas.RequestContext; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.AtlasTestBase; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasElement; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.repository.store.graph.v2.IAtlasEntityChangeNotifier; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.utils.TestLoadModelUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +@Guice(modules = org.apache.atlas.TestModules.TestOnlyModule.class) +public class PurgeServiceTest extends AtlasTestBase { + @Inject + private AtlasTypeDefStore typeDefStore; + + @Inject + private AtlasTypeRegistry typeRegistry; + + @Inject + private AtlasEntityStoreV2 entityStore; + + @Inject + private AtlasGraph atlasGraph; + + @BeforeClass + public void setup() throws Exception { + RequestContext.clear(); + super.initialize(); + TestLoadModelUtils.loadBaseModel(typeDefStore, typeRegistry); + TestLoadModelUtils.loadHiveModel(typeDefStore, typeRegistry); + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { } + } + + @Test + public void testPurgeEntities() throws Exception { + // Approach (pre-commit check on async flow): + // - Spy store's IAtlasEntityChangeNotifier and capture onEntitiesMutated(response,false) to assert + // the purged GUID before commit/locking paths. + // Create DB and table + AtlasEntity db = newHiveDb(null); + String dbGuid = persistAndGetGuid(db); + AtlasEntity tbl = newHiveTable(db, null); + String tblGuid = persistAndGetGuid(tbl); + + // Soft-delete table + EntityMutationResponse del = entityStore.deleteByIds(Collections.singletonList(tblGuid)); + assertNotNull(del); + + // Backdate timestamp beyond default 30d retention and reindex to reflect in Solr + backdateModificationTimestamp(tblGuid, 31); + reindexVertices(tblGuid); + pauseForIndexCreation(); + + // Enable hive purge and single worker for determinism + ApplicationProperties.get().setProperty("atlas.purge.enabled.services", "hive"); + ApplicationProperties.get().setProperty("atlas.purge.workers.count", "1"); + ApplicationProperties.get().setProperty("atlas.purge.worker.batch.size", "1"); + + // Clear context to avoid skipping due to prior delete tracking + RequestContext.clear(); + + // Inject notifier spy to capture pre-commit signal (pre-commit verification point) + Object originalNotifier = injectNotifierSpy(entityStore); + + new PurgeService(atlasGraph, entityStore, typeRegistry).purgeEntities(); + + // Verify notifier call and capture response before transaction commit (async-safe via timeout) + IAtlasEntityChangeNotifier spy = (IAtlasEntityChangeNotifier) getNotifier(entityStore); + ArgumentCaptor<EntityMutationResponse> cap = ArgumentCaptor.forClass(EntityMutationResponse.class); + Mockito.verify(spy, Mockito.timeout(5000)).onEntitiesMutated(cap.capture(), Mockito.eq(false)); + + EntityMutationResponse notified = cap.getValue(); + assertNotNull(notified); + List<AtlasEntityHeader> purged = notified.getPurgedEntities(); + assertNotNull(purged); + + assertTrue(purged.stream().anyMatch(h -> tblGuid.equals(h.getGuid()))); + + // Restore original notifier to leave the store in a clean state + restoreNotifier(entityStore, originalNotifier); + + // Flag assertions + assertTrue(RequestContext.get().isPurgeRequested()); + assertEquals(RequestContext.get().getDeleteType(), DeleteType.HARD); + } + + private AtlasEntity newHiveDb(String nameOpt) { + String name = nameOpt != null ? nameOpt : RandomStringUtils.randomAlphanumeric(10); + AtlasEntity db = new AtlasEntity("hive_db"); + db.setAttribute("name", name); + db.setAttribute("qualifiedName", name); + db.setAttribute("clusterName", "cl1"); + db.setAttribute("location", "/tmp"); + db.setAttribute("description", "test db"); + return db; + } + + private AtlasEntity newHiveTable(AtlasEntity db, String nameOpt) { + String name = nameOpt != null ? nameOpt : RandomStringUtils.randomAlphanumeric(10); + AtlasEntity tbl = new AtlasEntity("hive_table"); + tbl.setAttribute("name", name); + tbl.setAttribute("qualifiedName", name); + tbl.setAttribute("description", "random table"); + tbl.setAttribute("type", "type"); + tbl.setAttribute("tableType", "MANAGED"); + tbl.setAttribute("db", AtlasTypeUtil.getAtlasObjectId(db)); + return tbl; + } + + private String persistAndGetGuid(AtlasEntity entity) throws AtlasBaseException { + EntityMutationResponse resp = entityStore.createOrUpdate(new AtlasEntityStream(new AtlasEntityWithExtInfo(entity)), false); + String typeName = entity.getTypeName(); + AtlasEntityHeader hdr = resp.getFirstCreatedEntityByTypeName(typeName); + String guid = hdr != null ? hdr.getGuid() : null; + return guid; + } + + private void backdateModificationTimestamp(String guid, int days) { + AtlasVertex v = AtlasGraphUtilsV2.findByGuid(atlasGraph, guid); + if (v != null) { + long delta = days * 24L * 60 * 60 * 1000; + long ts = System.currentTimeMillis() - delta; + AtlasGraphUtilsV2.setProperty(v, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, ts); + } + } + + private void reindexVertices(String... guids) { + List<AtlasElement> elements = new ArrayList<>(); + for (String g : guids) { + if (g == null) { + continue; + } + AtlasVertex v = AtlasGraphUtilsV2.findByGuid(atlasGraph, g); + if (v != null) { + elements.add(v); + } + } + if (!elements.isEmpty()) { + try { + atlasGraph.getManagementSystem().reindex(Constants.VERTEX_INDEX, elements); + atlasGraph.getManagementSystem().reindex(Constants.FULLTEXT_INDEX, elements); + } catch (Exception ignored) { } + } + } + + // Test helper: swap store's private notifier field with a Mockito spy so we can + // capture and assert the pre-commit mutation response invoked by the store. + private Object injectNotifierSpy(AtlasEntityStoreV2 storeV2) throws Exception { + Field f = AtlasEntityStoreV2.class.getDeclaredField("entityChangeNotifier"); + f.setAccessible(true); + Object original = f.get(storeV2); + Object spy = Mockito.spy(original); + f.set(storeV2, spy); + return original; + } + + // Test helper: fetch the (spied) notifier instance currently installed on the store. + private Object getNotifier(AtlasEntityStoreV2 storeV2) throws Exception { + Field f = AtlasEntityStoreV2.class.getDeclaredField("entityChangeNotifier"); + f.setAccessible(true); + return f.get(storeV2); + } + + // Test helper: restore the original notifier instance after verification. + private void restoreNotifier(AtlasEntityStoreV2 storeV2, Object original) throws Exception { + Field f = AtlasEntityStoreV2.class.getDeclaredField("entityChangeNotifier"); + f.setAccessible(true); + f.set(storeV2, original); + } +} diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index 06ff05339..7064cc0da 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -68,6 +68,7 @@ import org.apache.atlas.repository.impexp.ZipSink; import org.apache.atlas.repository.patches.AtlasPatchManager; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.services.MetricsService; +import org.apache.atlas.services.PurgeService; import org.apache.atlas.tasks.TaskManagement; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; @@ -121,6 +122,7 @@ import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -162,6 +164,10 @@ public class AdminResource { private static final List<String> TIMEZONE_LIST = Arrays.asList(TimeZone.getAvailableIDs()); private static final String METRICS_PERSIST_INTERVAL = "atlas.metrics.persist.schedule"; private static final String METRICS_PERSIST_INTERVAL_DEFAULT = "0 0 0/1 * * *"; // 1 hour interval + private static final String PURGE_CRON_EXPRESSION = "atlas.purge.cron.expression"; + private static final String PURGE_CRON_EXPRESSION_DEFAULT = "* * * 30 2 ?"; // disabled by default, user controlled scheduling only + private static final String ACTIVE = "ACTIVE"; + private static final String PURGE_THREAD_NAME = "Scheduled-Purge-Thread"; private static final Configuration atlasProperties; private final ServiceState serviceState; @@ -192,6 +198,8 @@ public class AdminResource { private final boolean isUiTasksTabEnabled; private final AtlasAuditReductionService auditReductionService; private Response version; + private final PurgeService purgeService; + private final ReentrantLock cronPurgeOperationLock; @Context private HttpServletRequest httpServletRequest; @@ -205,7 +213,8 @@ public class AdminResource { MigrationProgressService migrationProgressService, AtlasServerService serverService, ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore, AtlasPatchManager patchManager, AtlasAuditService auditService, EntityAuditRepository auditRepository, - TaskManagement taskManagement, AtlasDebugMetricsSink debugMetricsRESTSink, AtlasAuditReductionService atlasAuditReductionService, AtlasMetricsUtil atlasMetricsUtil) { + TaskManagement taskManagement, AtlasDebugMetricsSink debugMetricsRESTSink, AtlasAuditReductionService atlasAuditReductionService, AtlasMetricsUtil atlasMetricsUtil, + PurgeService purgeService) { this.serviceState = serviceState; this.metricsService = metricsService; this.exportService = exportService; @@ -224,6 +233,8 @@ public class AdminResource { this.debugMetricsRESTSink = debugMetricsRESTSink; this.auditReductionService = atlasAuditReductionService; this.atlasMetricsUtil = atlasMetricsUtil; + this.purgeService = purgeService; + this.cronPurgeOperationLock = new ReentrantLock(); if (atlasProperties != null) { this.defaultUIVersion = atlasProperties.getString(DEFAULT_UI_VERSION, UI_VERSION_V3); @@ -804,6 +815,63 @@ public class AdminResource { } } + @Scheduled(cron = "#{getPurgeCronExpression}") + public void schedulePurgeEntities() throws AtlasBaseException { + try { + Thread.currentThread().setName(PURGE_THREAD_NAME); + if (acquireCronPurgeOperationLock()) { + String state = serviceState.getState().toString(); + LOG.info("==> Status of current node is {}", state); + if (state.equals(ACTIVE)) { + LOG.info("==> Scheduled Purging has started"); + EntityMutationResponse entityMutationResponse = purgeService.purgeEntities(); + Set<String> guids = new HashSet<>(); + + final List<AtlasEntityHeader> purgedEntities = entityMutationResponse.getPurgedEntities() != null + ? entityMutationResponse.getPurgedEntities() + : Collections.emptyList(); + + if (CollectionUtils.isEmpty(purgedEntities)) { + LOG.info("==> no entities got purged"); + return; + } + + for (AtlasEntityHeader entityHeader : entityMutationResponse.getPurgedEntities()) { + guids.add(entityHeader.getGuid()); + } + + LOG.info("==> Purged Entities {}", purgedEntities.size()); + + auditService.add(AuditOperation.AUTO_PURGE, guids.toString(), entityMutationResponse.getPurgedEntitiesIds(), + entityMutationResponse.getPurgedEntities().size()); + + LOG.info("==> Scheduled Purging has finished"); + } else { + LOG.info("==> Current node is not active, so skipping the scheduled purge"); + } + } + } catch (Exception e) { + LOG.error("Error while purging entities", e); + throw new AtlasBaseException(e); + } finally { + RequestContext.clear(); + LOG.info("==> clearing the context"); + cronPurgeOperationLock.unlock(); + } + } + + @Bean + private String getPurgeCronExpression() { + if (atlasProperties != null) { + return atlasProperties.getString(PURGE_CRON_EXPRESSION, PURGE_CRON_EXPRESSION_DEFAULT); + } + return PURGE_CRON_EXPRESSION_DEFAULT; + } + + private boolean acquireCronPurgeOperationLock() { + return cronPurgeOperationLock.tryLock(); + } + @POST @Path("/importfile") @Produces(Servlets.JSON_MEDIA_TYPE) diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java index f7e2c7ef7..aac34993e 100644 --- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java @@ -48,7 +48,7 @@ public class AdminResourceTest { public void testStatusOfActiveServerIsReturned() throws IOException { when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); - AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); Response response = adminResource.getStatus(); assertEquals(response.getStatus(), HttpServletResponse.SC_OK); @@ -62,7 +62,7 @@ public class AdminResourceTest { public void testResourceGetsValueFromServiceState() throws IOException { when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); - AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); Response response = adminResource.getStatus(); verify(serviceState).getState();
