This is an automated email from the ASF dual-hosted git repository.
radhikakundam pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new f944f813a ATLAS-4768: Implement aging for audits stored by Atlas.
f944f813a is described below
commit f944f813a04ae91aba94f3622e1213da34dab30a
Author: radhikakundam <[email protected]>
AuthorDate: Fri Sep 29 10:39:51 2023 -0700
ATLAS-4768: Implement aging for audits stored by Atlas.
Signed-off-by: radhikakundam <[email protected]>
(cherry picked from commit 79ef3cce1be4874c17b296aff8b200c9edb42bcb)
---
.../main/java/org/apache/atlas/AtlasClientV2.java | 10 +
.../org/apache/atlas/repository/Constants.java | 23 ++
.../java/org/apache/atlas/AtlasConfiguration.java | 18 +-
.../atlas/model/audit/AuditReductionCriteria.java | 226 +++++++++++++
.../atlas/discovery/AtlasDiscoveryService.java | 18 +
.../atlas/discovery/EntityDiscoveryService.java | 60 ++++
.../audit/AbstractStorageBasedAuditRepository.java | 11 +-
.../audit/AtlasAuditReductionService.java | 363 +++++++++++++++++++++
.../repository/audit/EntityAuditRepository.java | 12 +
.../audit/HBaseBasedAuditRepository.java | 158 ++++++++-
.../audit/InMemoryEntityAuditRepository.java | 53 ++-
.../audit/NoopEntityAuditRepository.java | 7 +
.../repository/graph/GraphBackedSearchIndexer.java | 3 +
.../tasks/AuditReductionEntityRetrievalTask.java | 231 +++++++++++++
.../store/graph/v2/tasks/AuditReductionTask.java | 150 +++++++++
.../graph/v2/tasks/AuditReductionTaskFactory.java | 113 +++++++
.../repository/audit/AuditRepositoryTestBase.java | 67 +++-
.../resources/solr/core-template/solrconfig.xml | 2 +-
.../atlas/web/filters/ActiveServerFilter.java | 2 +-
.../apache/atlas/web/resources/AdminResource.java | 46 ++-
.../atlas/web/resources/AdminResourceTest.java | 4 +-
21 files changed, 1543 insertions(+), 34 deletions(-)
diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
index 6f97da192..6477b4091 100644
--- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
+++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java
@@ -31,6 +31,7 @@ import com.sun.jersey.multipart.file.StreamDataBodyPart;
import org.apache.atlas.bulkimport.BulkImportResponse;
import org.apache.atlas.model.SearchFilter;
import org.apache.atlas.model.audit.AtlasAuditEntry;
+import org.apache.atlas.model.audit.AuditReductionCriteria;
import org.apache.atlas.model.audit.AuditSearchParameters;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.discovery.AtlasQuickSearchResult;
@@ -838,6 +839,14 @@ public class AtlasClientV2 extends AtlasBaseClient {
});
}
+ public void ageoutAtlasAudits(AuditReductionCriteria
auditReductionCriteria, boolean useAuditConfig) throws AtlasServiceException {
+ MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl();
+
+ queryParams.add("useAuditConfig", String.valueOf(useAuditConfig));
+
+ callAPI(API_V2.AGEOUT_ATLAS_AUDITS, List.class,
auditReductionCriteria, queryParams);
+ }
+
// Glossary APIs
public List<AtlasGlossary> getAllGlossaries(String sortByAttribute, int
limit, int offset) throws AtlasServiceException {
@@ -1269,6 +1278,7 @@ public class AtlasClientV2 extends AtlasBaseClient {
// Admin APIs
public static final API_V2 GET_ATLAS_AUDITS = new
API_V2(ATLAS_AUDIT_API, HttpMethod.POST, Response.Status.OK);
+ public static final API_V2 AGEOUT_ATLAS_AUDITS = new
API_V2(ATLAS_AUDIT_API + "ageout/", HttpMethod.POST, Response.Status.OK);
// Glossary APIs
public static final API_V2 GET_ALL_GLOSSARIES = new
API_V2(GLOSSARY_URI, HttpMethod.GET, Response.Status.OK);
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java
b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 51b093284..2134133f5 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -239,6 +239,15 @@ public final class Constants {
public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME =
encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime");
public static final String PROPERTY_KEY_INDEX_RECOVERY_CUSTOM_TIME =
encodePropertyKey(INDEX_RECOVERY_PREFIX + "customTime");
+ /**
+ * Audit Reduction vertex property keys.
+ */
+ public static final String AUDIT_REDUCTION_PREFIX =
INTERNAL_PROPERTY_KEY_PREFIX + "auditReduction_";
+ public static final String PROPERTY_KEY_AUDIT_REDUCTION_NAME =
encodePropertyKey(AUDIT_REDUCTION_PREFIX + "name");
+ public static final String PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_DEFAULT =
encodePropertyKey(AUDIT_REDUCTION_PREFIX + "default");
+ public static final String PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM =
encodePropertyKey(AUDIT_REDUCTION_PREFIX + "custom");
+ public static final String PROPERTY_KEY_GUIDS_TO_SWEEPOUT =
encodePropertyKey(AUDIT_REDUCTION_PREFIX + "sweepout");
+
public static final String SQOOP_SOURCE = "sqoop";
public static final String FALCON_SOURCE = "falcon";
public static final String HBASE_SOURCE = "hbase";
@@ -248,6 +257,20 @@ public final class Constants {
public static final String STORM_SOURCE = "storm";
public static final String FILE_SPOOL_SOURCE = "file_spool";
+ /**
+ * Audit Reduction related constants
+ */
+ public enum AtlasAuditAgingType { DEFAULT, CUSTOM, SWEEP }
+ public static final String AUDIT_REDUCTION_TYPE_NAME =
"__auditReductionInfo";
+ public static final String AUDIT_AGING_TYPE_KEY =
"auditAgingType";
+ public static final String AUDIT_AGING_TTL_KEY = "ttl";
+ public static final String AUDIT_AGING_COUNT_KEY =
"auditCount";
+ public static final String AUDIT_AGING_ENTITY_TYPES_KEY =
"entityTypes";
+ public static final String AUDIT_AGING_ACTION_TYPES_KEY =
"actionTypes";
+ public static final String AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY =
"excludeEntityTypes";
+ public static final String CREATE_EVENTS_AGEOUT_ALLOWED_KEY =
"createEventsAgeoutAllowed";
+ public static final String AUDIT_AGING_SUBTYPES_INCLUDED_KEY =
"subTypesIncluded";
+
/*
* All supported file-format extensions for Bulk Imports through file
upload
*/
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 58a2fa725..090889e1c 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -93,7 +93,23 @@ public enum AtlasConfiguration {
SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1),
UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true),
METRICS_TIME_TO_LIVE_HOURS( "atlas.metrics.ttl.hours", 336), // 14 days
default
- SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240); //10
days default
+ SOLR_INDEX_TX_LOG_TTL_CONF("write.ahead.log.ttl.in.hours", 240), //10
days default
+
+ ATLAS_AUDIT_AGING_ENABLED("atlas.audit.aging.enabled", false),
+ ATLAS_AUDIT_DEFAULT_AGEOUT_ENABLED("atlas.audit.default.ageout.enabled",
true),
+ ATLAS_AUDIT_DEFAULT_AGEOUT_TTL("atlas.audit.default.ageout.ttl.in.days",
90),
+ ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT("atlas.audit.default.ageout.count", 0),
+ ATLAS_AUDIT_CUSTOM_AGEOUT_TTL("atlas.audit.custom.ageout.ttl.in.days", 0),
+ ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT("atlas.audit.custom.ageout.count", 0),
+ ATLAS_AUDIT_SWEEP_OUT("atlas.audit.sweep.out.enabled", false),
+
ATLAS_AUDIT_CREATE_EVENTS_AGEOUT_ALLOWED("atlas.audit.create.events.ageout.allowed",
false),
+
ATLAS_AUDIT_AGING_SCHEDULER_FREQUENCY("atlas.audit.aging.scheduler.frequency.in.days",
30),
+ ATLAS_AUDIT_AGING_SUBTYPES_INCLUDED("atlas.audit.aging.subtypes.included",
true),
+ MIN_TTL_TO_MAINTAIN("atlas.audit.min.ttl.to.maintain", 7),
+ MIN_AUDIT_COUNT_TO_MAINTAIN("atlas.audit.min.count.to.maintain", 50),
+ ATLAS_AUDIT_AGING_SEARCH_MAX_LIMIT("atlas.audit.aging.search.maxlimit",
10000),
+
ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL("atlas.audit.default.ageout.ignore.ttl",
false),
+
ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION("atlas.audit.aging.ttl.test.automation",
false); //Only for test automation
private static final Configuration APPLICATION_PROPERTIES;
diff --git
a/intg/src/main/java/org/apache/atlas/model/audit/AuditReductionCriteria.java
b/intg/src/main/java/org/apache/atlas/model/audit/AuditReductionCriteria.java
new file mode 100644
index 000000000..c956b3ac8
--- /dev/null
+++
b/intg/src/main/java/org/apache/atlas/model/audit/AuditReductionCriteria.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.audit;
+
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.io.Serializable;
+import java.util.*;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility =
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AuditReductionCriteria implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private boolean auditAgingEnabled = false;
+ private boolean defaultAgeoutEnabled = false;
+ private boolean auditSweepoutEnabled = false;
+ private boolean createEventsAgeoutAllowed = false;
+ private boolean subTypesIncluded = true;
+ private boolean ignoreDefaultAgeoutTTL = false;
+
+ private int defaultAgeoutAuditCount;
+ private int defaultAgeoutTTLInDays;
+ private int customAgeoutAuditCount;
+ private int customAgeoutTTLInDays;
+
+ private String customAgeoutEntityTypes;
+ private String customAgeoutActionTypes;
+
+ private String sweepoutEntityTypes;
+ private String sweepoutActionTypes;
+
+ public boolean isAuditAgingEnabled() {
+ return auditAgingEnabled;
+ }
+
+ public void setAuditAgingEnabled(boolean auditAgingEnabled) {
+ this.auditAgingEnabled = auditAgingEnabled;
+ }
+
+ public boolean isDefaultAgeoutEnabled() {
+ return defaultAgeoutEnabled;
+ }
+
+ public void setDefaultAgeoutEnabled(boolean defaultAgeoutEnabled) {
+ this.defaultAgeoutEnabled = defaultAgeoutEnabled;
+ }
+
+ public boolean isAuditSweepoutEnabled() {
+ return auditSweepoutEnabled;
+ }
+
+ public void setAuditSweepoutEnabled(boolean auditSweepoutEnabled) {
+ this.auditSweepoutEnabled = auditSweepoutEnabled;
+ }
+
+ public boolean isCreateEventsAgeoutAllowed() {
+ return createEventsAgeoutAllowed;
+ }
+
+ public void setCreateEventsAgeoutAllowed(boolean
createEventsAgeoutAllowed) {
+ this.createEventsAgeoutAllowed = createEventsAgeoutAllowed;
+ }
+
+ public boolean isSubTypesIncluded() {
+ return subTypesIncluded;
+ }
+
+ public void setSubTypesIncluded(boolean subTypesIncluded) {
+ this.subTypesIncluded = subTypesIncluded;
+ }
+
+ public boolean ignoreDefaultAgeoutTTL() {
+ return ignoreDefaultAgeoutTTL;
+ }
+
+ public void setIgnoreDefaultAgeoutTTL(boolean ignoreDefaultAgeoutTTL) {
+ this.ignoreDefaultAgeoutTTL = ignoreDefaultAgeoutTTL;
+ }
+
+ public int getDefaultAgeoutTTLInDays() {
+ return defaultAgeoutTTLInDays;
+ }
+
+ public void setDefaultAgeoutTTLInDays(int defaultAgeoutTTLInDays) {
+ this.defaultAgeoutTTLInDays = defaultAgeoutTTLInDays;
+ }
+
+ public int getDefaultAgeoutAuditCount() {
+ return defaultAgeoutAuditCount;
+ }
+
+ public void setDefaultAgeoutAuditCount(int defaultAgeoutAuditCount) {
+ this.defaultAgeoutAuditCount = defaultAgeoutAuditCount;
+ }
+
+ public int getCustomAgeoutTTLInDays() {
+ return customAgeoutTTLInDays;
+ }
+
+ public void setCustomAgeoutTTLInDays(int customAgeoutTTLInDays) {
+ this.customAgeoutTTLInDays = customAgeoutTTLInDays;
+ }
+
+ public int getCustomAgeoutAuditCount() {
+ return customAgeoutAuditCount;
+ }
+
+ public void setCustomAgeoutAuditCount(int customAgeoutAuditCount) {
+ this.customAgeoutAuditCount = customAgeoutAuditCount;
+ }
+
+ public String getCustomAgeoutEntityTypes() {
+ return customAgeoutEntityTypes;
+ }
+
+ public void setCustomAgeoutEntityTypes(String customAgeoutEntityTypes) {
+ this.customAgeoutEntityTypes = customAgeoutEntityTypes;
+ }
+
+ public String getCustomAgeoutActionTypes() {
+ return customAgeoutActionTypes;
+ }
+
+ public void setCustomAgeoutActionTypes(String customAgeoutActionTypes) {
+ this.customAgeoutActionTypes = customAgeoutActionTypes;
+ }
+
+ public String getSweepoutEntityTypes() {
+ return sweepoutEntityTypes;
+ }
+
+ public void setSweepoutEntityTypes(String sweepoutEntityTypes) {
+ this.sweepoutEntityTypes = sweepoutEntityTypes;
+ }
+
+ public String getSweepoutActionTypes() {
+ return sweepoutActionTypes;
+ }
+
+ public void setSweepoutActionTypes(String sweepoutActionTypes) {
+ this.sweepoutActionTypes = sweepoutActionTypes;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ AuditReductionCriteria that = (AuditReductionCriteria) o;
+ return auditAgingEnabled == that.auditAgingEnabled &&
+ defaultAgeoutEnabled == that.defaultAgeoutEnabled &&
+ auditSweepoutEnabled == that.auditSweepoutEnabled &&
+ createEventsAgeoutAllowed == that.createEventsAgeoutAllowed &&
+ subTypesIncluded == that.subTypesIncluded &&
+ ignoreDefaultAgeoutTTL == that.ignoreDefaultAgeoutTTL &&
+ defaultAgeoutAuditCount == that.defaultAgeoutAuditCount &&
+ defaultAgeoutTTLInDays == that.defaultAgeoutTTLInDays &&
+ customAgeoutAuditCount == that.customAgeoutAuditCount &&
+ customAgeoutTTLInDays == that.customAgeoutTTLInDays &&
+ Objects.equals(customAgeoutEntityTypes,
that.customAgeoutEntityTypes) &&
+ Objects.equals(customAgeoutActionTypes,
that.customAgeoutActionTypes) &&
+ Objects.equals(sweepoutEntityTypes, that.sweepoutEntityTypes)
&&
+ Objects.equals(sweepoutActionTypes, that.sweepoutActionTypes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(auditAgingEnabled, defaultAgeoutEnabled,
auditSweepoutEnabled, createEventsAgeoutAllowed, subTypesIncluded,
ignoreDefaultAgeoutTTL, defaultAgeoutAuditCount, defaultAgeoutTTLInDays,
customAgeoutAuditCount, customAgeoutTTLInDays,
+ customAgeoutEntityTypes, customAgeoutActionTypes,
sweepoutEntityTypes, sweepoutActionTypes);
+ }
+
+ public StringBuilder toString(StringBuilder sb) {
+ if (sb == null) {
+ sb = new StringBuilder();
+ }
+
+ sb.append('{');
+
sb.append("auditAgingEnabled='").append(auditAgingEnabled).append('\'');
+ sb.append(",
createEventsAgeoutAllowed='").append(createEventsAgeoutAllowed).append('\'');
+ sb.append(",
subTypesIncluded='").append(subTypesIncluded).append('\'');
+ sb.append(",
defaultAgeoutEnabled='").append(defaultAgeoutEnabled).append('\'');
+ sb.append(",
ignoreDefaultAgeoutTTL='").append(ignoreDefaultAgeoutTTL).append('\'');
+ sb.append(",
defaultAgeoutTTLInDays='").append(defaultAgeoutTTLInDays).append('\'');
+ sb.append(",
defaultAgeoutAuditCount='").append(defaultAgeoutAuditCount).append('\'');
+ sb.append(",
auditSweepoutEnabled='").append(auditSweepoutEnabled).append('\'');
+ sb.append(",
customAgeoutAuditCount='").append(customAgeoutAuditCount).append('\'');
+ sb.append(",
customAgeoutTTLInDays='").append(customAgeoutTTLInDays).append('\'');
+ sb.append(",
customAgeoutEntityTypes=").append(customAgeoutEntityTypes);
+ sb.append(",
customAgeoutActionTypes=").append(customAgeoutActionTypes);
+ sb.append(", sweepoutEntityTypes=").append(sweepoutEntityTypes);
+ sb.append(", sweepoutActionTypes=").append(sweepoutActionTypes);
+ sb.append('}');
+
+ return sb;
+ }
+
+ @Override
+ public String toString() {
+ return toString(new StringBuilder()).toString();
+ }
+
+
+}
diff --git
a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
index f8e55b886..acab4ce0a 100644
---
a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
+++
b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java
@@ -22,10 +22,13 @@ package org.apache.atlas.discovery;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.*;
import org.apache.atlas.model.profile.AtlasUserSavedSearch;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public interface AtlasDiscoveryService {
/**
@@ -79,6 +82,14 @@ public interface AtlasDiscoveryService {
*/
AtlasSearchResult searchWithParameters(SearchParameters searchParameters)
throws AtlasBaseException;
+ /**
+ * Search for guids of entities matching the search criteria
+ * @param searchParameters Search criteria
+ * @return GUIDs of matching entities
+ * @throws AtlasBaseException
+ */
+ Set<String> searchGUIDsWithParameters(AtlasAuditAgingType auditAgingType,
Set<String> entityTypes, SearchParameters searchParameters) throws
AtlasBaseException;
+
/**
* Search for relations (edges) matching the search criteria
* @param searchParameters Search criteria
@@ -173,4 +184,11 @@ public interface AtlasDiscoveryService {
* @throws IOException
*/
AtlasSearchResultDownloadStatus getSearchResultDownloadStatus() throws
IOException;
+
+ /**
+ * Creates task to age out audits
+ * @param taskParams parameters of AtlasTask
+ * @return Task created to perform audit aging
+ */
+ AtlasTask createAndQueueAuditReductionTask(Map<String, Object> taskParams,
String taskType) throws AtlasBaseException;
}
diff --git
a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index 5b4395355..9be6517e9 100644
---
a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++
b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -50,6 +50,7 @@ import org.apache.atlas.query.executors.DSLQueryExecutor;
import org.apache.atlas.query.executors.ScriptEngineBasedExecutor;
import org.apache.atlas.query.executors.TraversalBasedExecutor;
import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasEdge;
@@ -60,6 +61,7 @@ import
org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import
org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory;
import
org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTask;
import
org.apache.atlas.repository.store.graph.v2.tasks.searchdownload.SearchResultDownloadTaskFactory;
import org.apache.atlas.repository.userprofile.UserProfileService;
@@ -467,6 +469,12 @@ public class EntityDiscoveryService implements
AtlasDiscoveryService {
return searchWithSearchContext(new SearchContext(searchParameters,
typeRegistry, graph, indexer.getVertexIndexKeys()));
}
+ @Override
+ @GraphTransaction
+ public Set<String> searchGUIDsWithParameters(AtlasAuditAgingType
auditAgingType, Set<String> entityTypes, SearchParameters searchParameters)
throws AtlasBaseException {
+ return searchEntityGUIDs(auditAgingType, entityTypes, new
SearchContext(searchParameters, typeRegistry, graph,
indexer.getVertexIndexKeys()));
+ }
+
@Override
@GraphTransaction
public void createAndQueueSearchResultDownloadTask(Map<String, Object>
taskParams) throws AtlasBaseException {
@@ -509,6 +517,19 @@ public class EntityDiscoveryService implements
AtlasDiscoveryService {
return result;
}
+ @Override
+ @GraphTransaction
+ public AtlasTask createAndQueueAuditReductionTask(Map<String, Object>
taskParams, String taskType) throws AtlasBaseException {
+ List<AtlasTask> pendingTasks =
taskManagement.getPendingTasksByType(taskType);
+ if (CollectionUtils.isNotEmpty(pendingTasks) && pendingTasks.size() >
AuditReductionTaskFactory.MAX_PENDING_TASKS_ALLOWED) {
+ throw new AtlasBaseException(PENDING_TASKS_ALREADY_IN_PROGRESS,
String.valueOf(pendingTasks.size()));
+ }
+ AtlasTask task = taskManagement.createTask(taskType,
RequestContext.getCurrentUser(), taskParams);
+ RequestContext.get().queueTask(task);
+
+ return task;
+ }
+
@Override
@GraphTransaction
public AtlasSearchResult
searchRelationsWithParameters(RelationshipSearchParameters searchParameters)
throws AtlasBaseException {
@@ -549,6 +570,45 @@ public class EntityDiscoveryService implements
AtlasDiscoveryService {
return ret;
}
+ public Set<String> searchEntityGUIDs(AtlasAuditAgingType auditAgingType,
Set<String> entityTypes, SearchContext searchContext) throws AtlasBaseException
{
+ SearchParameters searchParameters =
searchContext.getSearchParameters();
+ final QueryParams params =
QueryParams.getNormalizedParams(searchParameters.getLimit(),searchParameters.getOffset());
+ String searchID = searchTracker.add(searchContext);
// For future cancellations
+
+ searchParameters.setLimit(params.limit());
+ searchParameters.setOffset(params.offset());
+
+ Set<String> guids = new HashSet<>();
+ try {
+ List<AtlasVertex> resultList =
searchContext.getSearchProcessor().execute();
+ do {
+ for (AtlasVertex atlasVertex : resultList) {
+ if (atlasVertex != null &&
checkVertexMatchesSearchCriteria(atlasVertex, auditAgingType, entityTypes)) {
+
guids.add(atlasVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class));
+ }
+ }
+ searchParameters.setOffset(searchParameters.getOffset() +
searchParameters.getLimit());
+ resultList = searchContext.getSearchProcessor().execute();
+ } while (CollectionUtils.isNotEmpty(resultList));
+ LOG.info("Total {} entities are eligible for Audit aging",
guids.size());
+ } catch (Throwable t) {
+ LOG.error("Error while retrieving eligible entities for audit
aging");
+ } finally {
+ searchTracker.remove(searchID);
+ }
+
+ return guids;
+ }
+
+ private boolean checkVertexMatchesSearchCriteria(AtlasVertex vertex,
AtlasAuditAgingType auditAgingType, Set<String> entityTypes) {
+ if (CollectionUtils.isEmpty(entityTypes)) {
+ return true;
+ }
+ String typeName =
vertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class);
+ boolean typeNameMatchesCriteria = entityTypes.contains(typeName);
+ return (auditAgingType == AtlasAuditAgingType.DEFAULT) ?
!typeNameMatchesCriteria : typeNameMatchesCriteria;
+ }
+
private AtlasSearchResult searchWithSearchContext(SearchContext
searchContext) throws AtlasBaseException {
SearchParameters searchParameters =
searchContext.getSearchParameters();
AtlasSearchResult ret = new
AtlasSearchResult(searchParameters);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java
b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java
index 1aac37577..1b3dd478e 100644
---
a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java
+++
b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java
@@ -24,6 +24,7 @@ import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.atlas.service.Service;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
@@ -31,10 +32,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* This abstract base class should be used when adding support for an audit
storage backend.
@@ -162,4 +160,9 @@ public abstract class AbstractStorageBasedAuditRepository
implements Service, En
return Bytes.toBytes(keyStr);
}
+ @Override
+ public List<EntityAuditEventV2> deleteEventsV2(String entityId,
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short
auditCount, int ttlInDays, boolean createEventsAgeoutAllowed,
AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
+ return null;
+ }
+
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditReductionService.java
b/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditReductionService.java
new file mode 100644
index 000000000..3dcfe73fe
--- /dev/null
+++
b/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditReductionService.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.audit;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.model.audit.AuditReductionCriteria;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.config.IntervalTask;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.atlas.repository.Constants.*;
+import static
org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL;
+
+@Component
+public class AtlasAuditReductionService implements SchedulingConfigurer {
+ private static final Logger LOG =
LoggerFactory.getLogger(AtlasAuditReductionService.class);
+
+ private final AtlasGraph graph;
+ private final AtlasDiscoveryService discoveryService;
+ private final AtlasTypeRegistry typeRegistry;
+
+ private static final String VALUE_DELIMITER = ",";
+
+ private static final String ATLAS_AUDIT_SWEEP_OUT_ENTITY_TYPES =
"atlas.audit.sweep.out.entity.types";
+ private static final String ATLAS_AUDIT_SWEEP_OUT_ACTION_TYPES =
"atlas.audit.sweep.out.action.types";
+
+ private static final String ATLAS_AUDIT_CUSTOM_AGEOUT_ENTITY_TYPES =
"atlas.audit.custom.ageout.entity.types";
+ private static final String ATLAS_AUDIT_CUSTOM_AGEOUT_ACTION_TYPES =
"atlas.audit.custom.ageout.action.types";
+
+ private static final String ATLAS_AUDIT_AGING_SCHEDULER_INITIAL_DELAY =
"atlas.audit.aging.scheduler.initial.delay.in.min";
+
+ private static final int MIN_TTL_TO_MAINTAIN =
AtlasConfiguration.MIN_TTL_TO_MAINTAIN.getInt();
+ private static final int MIN_AUDIT_COUNT_TO_MAINTAIN =
AtlasConfiguration.MIN_AUDIT_COUNT_TO_MAINTAIN.getInt();
+
+ private Configuration atlasConfiguration;
+ private AuditReductionCriteria ageoutCriteriaByConfig;
+ private List<Map<String, Object>> ageoutTypeCriteriaMap;
+
+ @Inject
+ public AtlasAuditReductionService(Configuration config, AtlasGraph graph,
AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
+ this.atlasConfiguration = config;
+ this.graph = graph;
+ this.discoveryService = discoveryService;
+ this.typeRegistry = typeRegistry;
+ }
+
+ public List<AtlasTask> startAuditAgingByConfig() {
+ List<AtlasTask> tasks = null;
+ try {
+ if (ageoutCriteriaByConfig == null) {
+ ageoutCriteriaByConfig =
convertConfigToAuditReductionCriteria();
+ LOG.info("Audit aging is enabled by configuration");
+ }
+ LOG.info("Audit aging is triggered with configuration: {}",
ageoutCriteriaByConfig.toString());
+
+ if (ageoutTypeCriteriaMap == null) {
+ ageoutTypeCriteriaMap =
buildAgeoutCriteriaForAllAgingTypes(ageoutCriteriaByConfig);
+ }
+
+ tasks = startAuditAgingByCriteria(ageoutTypeCriteriaMap);
+
+ } catch (Exception e) {
+ LOG.error("Error while aging out audits by configuration: ", e);
+ }
+ return tasks;
+ }
+
+ public List<AtlasTask> startAuditAgingByCriteria(List<Map<String, Object>>
ageoutTypeCriteriaMap) {
+ if (CollectionUtils.isEmpty(ageoutTypeCriteriaMap)) {
+ return null;
+ }
+ List<AtlasTask> tasks = new ArrayList<>();
+ try {
+ for (Map<String, Object> eachCriteria : ageoutTypeCriteriaMap) {
+ AtlasTask auditAgingTask =
discoveryService.createAndQueueAuditReductionTask(eachCriteria,
ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL);
+ if (auditAgingTask != null) {
+ tasks.add(auditAgingTask);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error while aging out audits by criteria: ", e);
+ }
+ return tasks;
+ }
+
+ private AuditReductionCriteria convertConfigToAuditReductionCriteria() {
+ boolean auditAgingEnabled =
AtlasConfiguration.ATLAS_AUDIT_AGING_ENABLED.getBoolean();
+ boolean createAuditsAgeoutAllowed =
AtlasConfiguration.ATLAS_AUDIT_CREATE_EVENTS_AGEOUT_ALLOWED.getBoolean();
+ boolean subTypesIncluded =
AtlasConfiguration.ATLAS_AUDIT_AGING_SUBTYPES_INCLUDED.getBoolean();
+ boolean ignoreDefaultAgeoutTTL =
AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL.getBoolean();
+
+ int defaultAgeoutTTLConfigured =
AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL.getInt();
+ int defaultAgeoutAuditCountConfigured =
AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT.getInt();
+
+ int customAgeoutTTLConfigured =
AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_TTL.getInt();
+ int customAgeoutAuditCountConfigured =
AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT.getInt();
+
+ boolean defaultAgeoutEnabled =
AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_ENABLED.getBoolean();
+ int defaultAgeoutTTL =
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL,
defaultAgeoutTTLConfigured, MIN_TTL_TO_MAINTAIN);
+ int defaultAgeoutAuditCount =
defaultAgeoutAuditCountConfigured <= 0 ? defaultAgeoutAuditCountConfigured
+ :
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT,
defaultAgeoutAuditCountConfigured, MIN_AUDIT_COUNT_TO_MAINTAIN);
+
+ int customAgeoutTTL = customAgeoutTTLConfigured <= 0 ?
customAgeoutTTLConfigured
+ :
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_TTL,
customAgeoutTTLConfigured, MIN_TTL_TO_MAINTAIN);
+ int customAgeoutAuditCount = customAgeoutAuditCountConfigured <=
0 ? customAgeoutAuditCountConfigured
+ :
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT,
customAgeoutAuditCountConfigured, MIN_AUDIT_COUNT_TO_MAINTAIN);
+ String customAgeoutEntityTypes =
getStringOf(ATLAS_AUDIT_CUSTOM_AGEOUT_ENTITY_TYPES);
+ String customAgeoutActionTypes =
getStringOf(ATLAS_AUDIT_CUSTOM_AGEOUT_ACTION_TYPES);
+
+ AuditReductionCriteria auditReductionCriteria = new
AuditReductionCriteria();
+
+ auditReductionCriteria.setAuditAgingEnabled(auditAgingEnabled);
+
auditReductionCriteria.setCreateEventsAgeoutAllowed(createAuditsAgeoutAllowed);
+ auditReductionCriteria.setSubTypesIncluded(subTypesIncluded);
+
auditReductionCriteria.setIgnoreDefaultAgeoutTTL(ignoreDefaultAgeoutTTL);
+
+ auditReductionCriteria.setDefaultAgeoutEnabled(defaultAgeoutEnabled);
+ auditReductionCriteria.setDefaultAgeoutTTLInDays(defaultAgeoutTTL);
+
auditReductionCriteria.setDefaultAgeoutAuditCount(defaultAgeoutAuditCount);
+
+ auditReductionCriteria.setCustomAgeoutTTLInDays(customAgeoutTTL);
+
auditReductionCriteria.setCustomAgeoutAuditCount(customAgeoutAuditCount);
+
auditReductionCriteria.setCustomAgeoutEntityTypes(customAgeoutEntityTypes);
+
auditReductionCriteria.setCustomAgeoutActionTypes(customAgeoutActionTypes);
+
+ boolean isSweepOutEnabled =
AtlasConfiguration.ATLAS_AUDIT_SWEEP_OUT.getBoolean();
+ auditReductionCriteria.setAuditSweepoutEnabled(isSweepOutEnabled);
+ if (isSweepOutEnabled) {
+ String sweepoutEntityTypes =
getStringOf(ATLAS_AUDIT_SWEEP_OUT_ENTITY_TYPES);
+ String sweepoutActionTypes =
getStringOf(ATLAS_AUDIT_SWEEP_OUT_ACTION_TYPES);
+ auditReductionCriteria.setSweepoutEntityTypes(sweepoutEntityTypes);
+ auditReductionCriteria.setSweepoutActionTypes(sweepoutActionTypes);
+ }
+
+ return auditReductionCriteria;
+ }
+
+ public List<Map<String, Object>>
buildAgeoutCriteriaForAllAgingTypes(AuditReductionCriteria
auditReductionCriteria) {
+ if (auditReductionCriteria == null ||
!auditReductionCriteria.isAuditAgingEnabled()) {
+ return null;
+ }
+ List<Map<String, Object>> auditAgeoutCriteriaByType = new
ArrayList<>();
+ Set<String> defaultAgeoutEntityTypesToExclude = new
HashSet<>();
+ Set<String> defaultAgeoutActionTypes =
Arrays.stream(EntityAuditEventV2.EntityAuditActionV2.values()).map(x ->
x.toString()).collect(Collectors.toSet());
+
+ boolean createEventsAgeoutAllowed =
auditReductionCriteria.isCreateEventsAgeoutAllowed();
+ boolean subTypesIncluded =
auditReductionCriteria.isSubTypesIncluded();
+ boolean ignoreDefaultAgeoutTTL =
auditReductionCriteria.ignoreDefaultAgeoutTTL();
+
+ boolean defaultAgeoutEnabled =
auditReductionCriteria.isDefaultAgeoutEnabled();
+ int defaultAgeoutTTL = ignoreDefaultAgeoutTTL ? 0 :
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL,
auditReductionCriteria.getDefaultAgeoutTTLInDays(), MIN_TTL_TO_MAINTAIN);
+ int defaultAgeoutAuditCount =
auditReductionCriteria.getDefaultAgeoutAuditCount() <= 0 ?
auditReductionCriteria.getDefaultAgeoutAuditCount()
+ :
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_COUNT,
auditReductionCriteria.getDefaultAgeoutAuditCount(),
MIN_AUDIT_COUNT_TO_MAINTAIN);
+
+ int customAgeoutTTL =
auditReductionCriteria.getCustomAgeoutTTLInDays() <= 0 ?
auditReductionCriteria.getCustomAgeoutTTLInDays()
+ :
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_TTL,
auditReductionCriteria.getCustomAgeoutTTLInDays(), MIN_TTL_TO_MAINTAIN);
+ int customAgeoutAuditCount =
auditReductionCriteria.getCustomAgeoutAuditCount() <= 0 ?
auditReductionCriteria.getCustomAgeoutAuditCount()
+ :
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_CUSTOM_AGEOUT_COUNT,
auditReductionCriteria.getCustomAgeoutAuditCount(),
MIN_AUDIT_COUNT_TO_MAINTAIN);
+ Set<String> customAgeoutEntityTypes =
getUniqueListOf(auditReductionCriteria.getCustomAgeoutEntityTypes());
+ Set<String> customAgeoutActionTypes =
getValidActionTypes(AtlasAuditAgingType.CUSTOM,
getUniqueListOf(auditReductionCriteria.getCustomAgeoutActionTypes()));
+
+ defaultAgeoutEntityTypesToExclude.addAll(customAgeoutEntityTypes);
+ if (CollectionUtils.isEmpty(customAgeoutEntityTypes)) {
+ defaultAgeoutActionTypes.removeAll(customAgeoutActionTypes);
+ }
+
+ boolean isSweepOutEnabled =
auditReductionCriteria.isAuditSweepoutEnabled();
+ if (isSweepOutEnabled) {
+ Set<String> sweepOutEntityTypes =
getUniqueListOf(auditReductionCriteria.getSweepoutEntityTypes());
+ Set<String> sweepOutActionTypes =
getValidActionTypes(AtlasAuditAgingType.SWEEP,
getUniqueListOf(auditReductionCriteria.getSweepoutActionTypes()));
+
+ if (CollectionUtils.isNotEmpty(sweepOutEntityTypes) ||
CollectionUtils.isNotEmpty(sweepOutActionTypes)) {
+ Map<String, Object> sweepAgeoutCriteria =
getAgeoutCriteriaMap(AtlasAuditAgingType.SWEEP, 0, 0, sweepOutEntityTypes,
sweepOutActionTypes, createEventsAgeoutAllowed, subTypesIncluded);
+ auditAgeoutCriteriaByType.add(sweepAgeoutCriteria);
+ } else {
+ LOG.error("Sweepout of audits is skipped.At least one of two
properties: entity types/action types should be configured.");
+ }
+
+ defaultAgeoutEntityTypesToExclude.addAll(sweepOutEntityTypes);
+ customAgeoutEntityTypes.removeAll(sweepOutEntityTypes);
+
+ if (CollectionUtils.isEmpty(sweepOutEntityTypes)) {
+ defaultAgeoutActionTypes.removeAll(sweepOutActionTypes);
+ customAgeoutActionTypes.removeAll(sweepOutActionTypes);
+ }
+ }
+
+ if ((customAgeoutTTL > 0 || customAgeoutAuditCount > 0) &&
(CollectionUtils.isNotEmpty(customAgeoutEntityTypes) ||
CollectionUtils.isNotEmpty(customAgeoutActionTypes))) {
+ Map<String, Object> customAgeoutCriteria =
getAgeoutCriteriaMap(AtlasAuditAgingType.CUSTOM, customAgeoutTTL,
customAgeoutAuditCount, customAgeoutEntityTypes, customAgeoutActionTypes,
createEventsAgeoutAllowed, subTypesIncluded);
+ auditAgeoutCriteriaByType.add(customAgeoutCriteria);
+ } else if (customAgeoutTTL <= 0 && customAgeoutAuditCount <= 0 &&
CollectionUtils.isEmpty(customAgeoutEntityTypes) &&
CollectionUtils.isEmpty(customAgeoutActionTypes)) {
+ //Do Nothing
+ } else if (customAgeoutTTL <= 0 && customAgeoutAuditCount <= 0) {
+ LOG.error("Custom Audit aging is skipped.At least one of two
properties: TTL/Audit Count should be configured.");
+ } else {
+ LOG.error("Custom Audit aging is skipped.At least one of two
properties: entity types/action types should be configured.");
+ }
+
+ if (defaultAgeoutEnabled) {
+ if (ignoreDefaultAgeoutTTL) {
+ LOG.info("'{}' config property or 'ignoreDefaultAgeoutTTL'
property in API configured as: {}, Default audit aging will be done by audit
count only",
AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_IGNORE_TTL.getPropertyName(),
ignoreDefaultAgeoutTTL);
+ }
+
+ /**In case of default ageout with all available audit actions,
query to ATLAS_ENTITY_AUDIT_EVENTS table
+ * without any action type provides data for all audit actions and
is more performant than
+ * multiple queries to ATLAS_ENTITY_AUDIT_EVENTS with each action
type
+ */
+ if (defaultAgeoutActionTypes.size() ==
EntityAuditEventV2.EntityAuditActionV2.values().length) {
+ defaultAgeoutActionTypes.clear();
+ }
+ if (!ignoreDefaultAgeoutTTL || defaultAgeoutAuditCount > 0) {
+ Map<String, Object> defaultAgeoutCriteria =
getAgeoutCriteriaMap(AtlasAuditAgingType.DEFAULT, defaultAgeoutTTL,
defaultAgeoutAuditCount, defaultAgeoutEntityTypesToExclude,
defaultAgeoutActionTypes, createEventsAgeoutAllowed, subTypesIncluded);
+ auditAgeoutCriteriaByType.add(defaultAgeoutCriteria);
+ } else {
+ LOG.error("Default Audit aging is skipped. Valid audit count
should be configured when TTL criteria is ignored.");
+ }
+ }
+ return auditAgeoutCriteriaByType;
+ }
+
+ private Map<String, Object> getAgeoutCriteriaMap(AtlasAuditAgingType
agingOption, int ttl, int minCount, Set<String> entityTypes, Set<String>
actionTypes, boolean createEventsAgeoutAllowed, boolean subTypesIncluded) {
+ Map<String, Object> auditAgingOptions = new HashMap<>();
+ auditAgingOptions.put(AUDIT_AGING_TYPE_KEY, agingOption);
+ auditAgingOptions.put(AUDIT_AGING_TTL_KEY, ttl);
+ auditAgingOptions.put(AUDIT_AGING_COUNT_KEY, minCount);
+ auditAgingOptions.put(AUDIT_AGING_ENTITY_TYPES_KEY, entityTypes);
+ auditAgingOptions.put(AUDIT_AGING_ACTION_TYPES_KEY, actionTypes);
+ auditAgingOptions.put(CREATE_EVENTS_AGEOUT_ALLOWED_KEY,
createEventsAgeoutAllowed);
+ auditAgingOptions.put(AUDIT_AGING_SUBTYPES_INCLUDED_KEY,
subTypesIncluded);
+
+ return auditAgingOptions;
+ }
+
+ private int getGuaranteedMinValueOf(AtlasConfiguration configuration, int
configuredValue, int minValueToMaintain) {
+ if (configuredValue < minValueToMaintain) {
+ LOG.info("Minimum value for '{}' should be {}",
configuration.getPropertyName(), minValueToMaintain);
+ }
+ return configuredValue < minValueToMaintain ? minValueToMaintain :
configuredValue;
+ }
+
+ private String getStringOf(String configProperty) {
+ String configuredValue = null;
+
+ if (StringUtils.isNotEmpty(configProperty)) {
+ configuredValue = String.join(VALUE_DELIMITER , (List)
atlasConfiguration.getList(configProperty));
+ }
+
+ return configuredValue;
+ }
+
+ private Set<String> getUniqueListOf(String value) {
+ Set<String> configuredValues = null;
+
+ if (StringUtils.isNotEmpty(value)) {
+ configuredValues =
Stream.of(value.split(VALUE_DELIMITER)).map(String::trim).collect(Collectors.toSet());
+ }
+
+ return configuredValues == null ? new HashSet<>() : configuredValues;
+ }
+
+ private Set<String> getValidActionTypes(AtlasAuditAgingType
auditAgingType, Set<String> actionTypes) {
+ if (CollectionUtils.isEmpty(actionTypes)) {
+ return Collections.emptySet();
+ }
+ Set<String> allActionTypes =
Arrays.stream(EntityAuditEventV2.EntityAuditActionV2.values()).map(x ->
x.toString()).collect(Collectors.toSet());
+ Set<String> entityAuditActions = new HashSet<>();
+ Set<String> invalidActionTypes = new HashSet<>();
+
+ for (String actionType : actionTypes) {
+ Set<String> matchedActionTypes;
+ final String actionTypeToMatch = actionType.contains("*") ?
actionType.replace("*", "") : actionType;
+ if (actionTypeToMatch.startsWith("*")) {
+ matchedActionTypes = allActionTypes.stream().filter(x ->
x.contains(actionTypeToMatch)).collect(Collectors.toSet());
+ } else {
+ matchedActionTypes = allActionTypes.stream().filter(x ->
x.startsWith(actionTypeToMatch)).collect(Collectors.toSet());
+ }
+
+ if (CollectionUtils.isEmpty(matchedActionTypes)) {
+ invalidActionTypes.add(actionType);
+ } else {
+ entityAuditActions.addAll(matchedActionTypes);
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(actionTypes) &&
CollectionUtils.isEmpty(entityAuditActions)) {
+ throw new IllegalArgumentException("No enum constant " +
EntityAuditEventV2.EntityAuditActionV2.class.getCanonicalName() + "." +
String.join(VALUE_DELIMITER, invalidActionTypes));
+ } else {
+ LOG.info("Action type name(s) {} provided for aging type-{}",
String.join(VALUE_DELIMITER, entityAuditActions), auditAgingType);
+ }
+
+ if (CollectionUtils.isNotEmpty(invalidActionTypes)){
+ LOG.warn("Invalid action type name(s) {} provided for aging
type-{}", String.join(VALUE_DELIMITER, invalidActionTypes), auditAgingType);
+ }
+
+ return entityAuditActions;
+ }
+
+ @Override
+ public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
+ if (!AtlasConfiguration.ATLAS_AUDIT_AGING_ENABLED.getBoolean()) {
+ LOG.warn("Audit aging is not enabled");
+ return;
+ }
+ IntervalTask task = new IntervalTask(new Runnable() {
+ @Override
+ public void run() {
+ startAuditAgingByConfig();
+ }
+ }, getAuditAgingFrequencyInMillis(),
getAuditAgingInitialDelayInMillis());
+
+ taskRegistrar.addFixedRateTask(task);
+ }
+
+ private long getAuditAgingFrequencyInMillis() {
+ int frequencyInDays =
getGuaranteedMinValueOf(AtlasConfiguration.ATLAS_AUDIT_AGING_SCHEDULER_FREQUENCY,
AtlasConfiguration.ATLAS_AUDIT_AGING_SCHEDULER_FREQUENCY.getInt(), 1);
+ return frequencyInDays * DateUtils.MILLIS_PER_DAY;
+ }
+
+ private long getAuditAgingInitialDelayInMillis() {
+ int initialDelayInMins = 1;
+ try {
+ initialDelayInMins =
ApplicationProperties.get().getInt(ATLAS_AUDIT_AGING_SCHEDULER_INITIAL_DELAY,
1);
+ } catch (AtlasException ex) {
+ LOG.error("Error while fetching application properties", ex);
+ }
+ return (initialDelayInMins < 1 ? 1 : initialDelayInMins) *
DateUtils.MILLIS_PER_MINUTE;
+ }
+}
\ No newline at end of file
diff --git
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
index bad2a89fc..1fdacab29 100644
---
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
+++
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java
@@ -22,6 +22,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import java.util.List;
import java.util.Set;
@@ -92,6 +93,17 @@ public interface EntityAuditRepository {
*/
List<EntityAuditEventV2> listEventsV2(String entityId,
EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn,
boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException;
+ /**
+ * Delete all events for the given entity id by keeping only auditCount
number of events with entityAuditActions
+ * @param entityId entity id
+ * @param entityAuditActions operation(s) to be used to filter at HBase
column
+ * @param auditCount Max numbers of events to keep without deleting
+ * @param ttlInDays time-to-live of events
+ * @return list of events
+ * @throws AtlasBaseException
+ */
+ List<EntityAuditEventV2> deleteEventsV2(String entityId,
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short
auditCount, int ttlInDays, boolean createEventsAgeoutAllowed,
AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException;
+
/***
* List events for given time range where classifications have been added,
deleted or updated.
* @param fromTimestamp from timestamp
diff --git
a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index a99f9b383..b7f0dd5e2 100644
---
a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++
b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.audit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.RequestContext;
@@ -28,10 +29,12 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.time.DateUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -39,6 +42,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -48,6 +52,7 @@ import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
@@ -63,14 +68,18 @@ import org.springframework.core.annotation.Order;
import javax.inject.Singleton;
import java.io.Closeable;
import java.io.IOException;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.stream.Collectors;
/**
@@ -329,8 +338,13 @@ public class HBaseBasedAuditRepository extends
AbstractStorageBasedAuditReposito
@Override
public List<EntityAuditEventV2> listEventsV2(String entityId,
EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn,
boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
+ return listEventsV2(entityId, auditAction, sortByColumn,
sortOrderDesc, offset, limit, false, true, false, null);
+ }
+
+ private List<EntityAuditEventV2> listEventsV2(String entityId,
EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn,
boolean sortOrderDesc, int offset, short limit, boolean isAgeoutTransaction,
boolean createEventsAgeoutAllowed, boolean allowAgeoutByAuditCount,
List<EntityAuditEventV2> eventsToKeep) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={},
auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})",
entityId, auditAction, sortByColumn, offset, limit);
+ LOG.debug("==> HBaseBasedAuditRepository.listEventsV2(entityId={},
auditAction={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})",
+ entityId, auditAction, sortByColumn, sortOrderDesc,
offset, limit);
}
AtlasPerfMetrics.MetricRecorder metric =
RequestContext.get().startMetricRecord("listEventsV2");
@@ -343,7 +357,7 @@ public class HBaseBasedAuditRepository extends
AbstractStorageBasedAuditReposito
offset = 0;
}
- if (limit < 0) {
+ if (!isAgeoutTransaction && limit < 0) {
limit = 100;
}
@@ -356,19 +370,29 @@ public class HBaseBasedAuditRepository extends
AbstractStorageBasedAuditReposito
* MultiRowRangeFilter and then again scan the table to get all
the columns from the table this time.
*/
Scan scan = new Scan().setReversed(true)
- .setCaching(DEFAULT_CACHING)
- .setSmall(true)
- .setStopRow(Bytes.toBytes(entityId))
- .setStartRow(getKey(entityId,
Long.MAX_VALUE, Integer.MAX_VALUE))
- .addColumn(COLUMN_FAMILY, COLUMN_ACTION)
- .addColumn(COLUMN_FAMILY, COLUMN_USER);
-
+ .setCaching(DEFAULT_CACHING)
+ .setSmall(true)
+ .setStopRow(Bytes.toBytes(entityId))
+ .setStartRow(getKey(entityId, Long.MAX_VALUE,
Integer.MAX_VALUE))
+ .addColumn(COLUMN_FAMILY, COLUMN_ACTION)
+ .addColumn(COLUMN_FAMILY, COLUMN_USER);
+ FilterList filterList = new FilterList();
if (auditAction != null) {
Filter filterAction = new
SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION,
CompareFilter.CompareOp.EQUAL, new
BinaryComparator(Bytes.toBytes(auditAction.toString())));
+ filterList.addFilter(filterAction);
+ }
- scan.setFilter(filterAction);
+ if (!createEventsAgeoutAllowed) {
+ FilterList createEventFilterList = new
FilterList(FilterList.Operator.MUST_PASS_ALL);
+ Filter filterByCreateActionType = new
SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION,
CompareFilter.CompareOp.NOT_EQUAL, new
BinaryComparator(Bytes.toBytes(EntityAuditActionV2.ENTITY_CREATE.toString())));
+ Filter filterByImportCreateActionType = new
SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION,
CompareFilter.CompareOp.NOT_EQUAL, new
BinaryComparator(Bytes.toBytes(EntityAuditActionV2.ENTITY_IMPORT_CREATE.toString())));
+ createEventFilterList.addFilter(filterByCreateActionType);
+
createEventFilterList.addFilter(filterByImportCreateActionType);
+ filterList.addFilter(createEventFilterList);
}
+ scan.setFilter(filterList);
+
List<EntityAuditEventV2> events = new ArrayList<>();
try (ResultScanner scanner = table.getScanner(scan)) {
@@ -383,10 +407,25 @@ public class HBaseBasedAuditRepository extends
AbstractStorageBasedAuditReposito
}
EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
+ int fromIndex = Math.min(events.size(), offset);
+ int endIndex = events.size();
+ if (limit > 0) {
+ endIndex = Math.min(events.size(), offset + limit);
+ }
+
+ if (isAgeoutTransaction) {
+ if (!allowAgeoutByAuditCount) { //No audit events allowed to
age-out by audit count
+ eventsToKeep.addAll(events);
+ return Collections.emptyList();
+ }
+ eventsToKeep.addAll(events.subList(0, fromIndex));
+ }
+
+ events = events.subList(fromIndex, endIndex);
- events = events.subList(Math.min(events.size(), offset),
Math.min(events.size(), offset + limit));
if (events.size() > 0) {
+
List<MultiRowRangeFilter.RowRange> ranges = new ArrayList<>();
events.forEach(e -> {
@@ -394,11 +433,11 @@ public class HBaseBasedAuditRepository extends
AbstractStorageBasedAuditReposito
});
scan = new Scan().setReversed(true)
- .setCaching(DEFAULT_CACHING)
- .setSmall(true)
- .setStopRow(Bytes.toBytes(entityId))
- .setStartRow(getKey(entityId, Long.MAX_VALUE,
Integer.MAX_VALUE))
- .setFilter(new MultiRowRangeFilter(ranges));
+ .setCaching(DEFAULT_CACHING)
+ .setSmall(true)
+ .setStopRow(Bytes.toBytes(entityId))
+ .setStartRow(getKey(entityId, Long.MAX_VALUE,
Integer.MAX_VALUE))
+ .setFilter(new MultiRowRangeFilter(ranges));
try (ResultScanner scanner = table.getScanner(scan)) {
events = new ArrayList<>();
@@ -426,7 +465,8 @@ public class HBaseBasedAuditRepository extends
AbstractStorageBasedAuditReposito
}
if (LOG.isDebugEnabled()) {
- LOG.debug("<==
HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={},
sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #recored returned {}",
entityId, auditAction, sortByColumn, offset, limit, events.size());
+ LOG.debug("<==
HBaseBasedAuditRepository.listEventsV2(entityId={}, auditAction={},
sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): #records returned {}",
+ entityId, auditAction, sortByColumn, sortOrderDesc,
offset, limit, events.size());
}
return events;
@@ -452,6 +492,88 @@ public class HBaseBasedAuditRepository extends
AbstractStorageBasedAuditReposito
return ret;
}
+ @Override
+ public List<EntityAuditEventV2> deleteEventsV2(String entityId,
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short
allowedAuditCount, int ttlInDays, boolean createEventsAgeoutAllowed,
AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
+ final String SORT_BY_COLUMN =
EntityAuditEventV2.SORT_COLUMN_TIMESTAMP;
+ final boolean SORT_ORDER_DESC = true;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> HBaseBasedAuditRepository.deleteEventsV2(entityId={},
auditActions={}, sortByColumn={}, sortOrderDesc={}, offset={}, limit={})",
+ entityId, Arrays.toString(entityAuditActions.toArray()),
SORT_BY_COLUMN, SORT_ORDER_DESC, allowedAuditCount, -1);
+ }
+
+ AtlasPerfMetrics.MetricRecorder metric =
RequestContext.get().startMetricRecord("deleteEventsV2");
+
+ Table table = null;
+ List<EntityAuditEventV2> eventsEligibleForAgeout = new ArrayList<>();
+ try {
+ table = connection.getTable(tableName);
+ List<EntityAuditEventV2> eventsToKeep = new ArrayList<>();
+ boolean allowAgeoutByAuditCount = allowedAuditCount > 0 ||
(auditAgingType == AtlasAuditAgingType.SWEEP);
+ if (CollectionUtils.isEmpty(entityAuditActions)) {
+ eventsEligibleForAgeout.addAll(listEventsV2(entityId, null,
SORT_BY_COLUMN, SORT_ORDER_DESC,
+ allowedAuditCount, (short)
-1, true, createEventsAgeoutAllowed, allowAgeoutByAuditCount, eventsToKeep));
+ } else {
+ for (EntityAuditActionV2 eachAuditAction : entityAuditActions)
{
+ List<EntityAuditEventV2> eventsByEachAuditAction =
listEventsV2(entityId, eachAuditAction, SORT_BY_COLUMN, SORT_ORDER_DESC,
+ allowedAuditCount, (short)
-1, true, createEventsAgeoutAllowed, allowAgeoutByAuditCount, eventsToKeep);
+
+ if (CollectionUtils.isNotEmpty(eventsByEachAuditAction)) {
+
eventsEligibleForAgeout.addAll(eventsByEachAuditAction);
+ }
+ }
+ }
+
+ if (CollectionUtils.isNotEmpty(eventsToKeep)) {
+ //Limit events based on configured audit count by grouping
events of all action types
+ if (allowAgeoutByAuditCount && (auditAgingType ==
AtlasAuditAgingType.DEFAULT || CollectionUtils.isEmpty(entityAuditActions))) {
+ LOG.debug("Aging out audit events by audit count for
entity: {}", entityId);
+ EntityAuditEventV2.sortEvents(eventsToKeep,
SORT_BY_COLUMN, SORT_ORDER_DESC);
+ if (allowedAuditCount < eventsToKeep.size()) {
+
eventsEligibleForAgeout.addAll(eventsToKeep.subList(allowedAuditCount,
eventsToKeep.size()));
+ eventsToKeep = eventsToKeep.subList(0,
allowedAuditCount);
+ }
+ }
+ //TTL based aging
+ LocalDateTime now = LocalDateTime.now();
+ boolean isTTLTestAutomation =
AtlasConfiguration.ATLAS_AUDIT_AGING_TTL_TEST_AUTOMATION.getBoolean();
+ if (ttlInDays > 0) {
+ LOG.debug("Aging out audit events by TTL for entity: {}",
entityId);
+ long ttlTimestamp = Timestamp.valueOf(isTTLTestAutomation
? now.minusMinutes(ttlInDays) : now.minusDays(ttlInDays)).getTime();
+ eventsToKeep.forEach(e -> {
+ if (e.getTimestamp() < ttlTimestamp) {
+ eventsEligibleForAgeout.add(e);
+ }
+ });
+ }
+ }
+
+ List<Delete> eventsToDelete = new ArrayList<>();
+ for (EntityAuditEventV2 event : eventsEligibleForAgeout) {
+ Delete delete = new Delete(Bytes.toBytes(event.getEventKey()));
+ eventsToDelete.add(delete);
+ }
+
+ if (CollectionUtils.isNotEmpty(eventsToDelete)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Deleting events from table:{} are {}",
tableName, Arrays.toString(eventsToDelete.toArray()));
+ }
+ table.delete(eventsToDelete);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<==
HBaseBasedAuditRepository.deleteEventsV2(entityId={}, auditAction={},
sortByColumn={}, sortOrderDesc={}, offset={}, limit={}): ",
+ entityId, Arrays.toString(entityAuditActions.toArray()),
SORT_BY_COLUMN, SORT_ORDER_DESC, allowedAuditCount, -1);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed deleting audit events for guid:{}", entityId);
+ } finally {
+ RequestContext.get().endMetricRecord(metric);
+ close(table);
+ }
+ return eventsEligibleForAgeout;
+ }
+
private <T> void addColumn(Put put, byte[] columnName, T columnValue) {
if (columnValue != null && !columnValue.toString().isEmpty()) {
put.addColumn(COLUMN_FAMILY, columnName,
Bytes.toBytes(columnValue.toString()));
diff --git
a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
index 51efff1d3..6d31e9727 100644
---
a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
+++
b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java
@@ -23,10 +23,13 @@ import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component;
import javax.inject.Singleton;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -103,17 +106,59 @@ public class InMemoryEntityAuditRepository implements
EntityAuditRepository {
@Override
public List<EntityAuditEventV2> listEventsV2(String entityId,
EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn,
boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException {
+ return listEventsV2(entityId, auditAction, sortByColumn,
sortOrderDesc, 0, offset, limit, true, true);
+ }
+
+ private List<EntityAuditEventV2> listEventsV2(String entityId,
EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn,
boolean sortOrderDesc, int ttlInDays, int offset, short limit, boolean
allowMaxResults, boolean createEventsAgeoutAllowed) throws AtlasBaseException {
List<EntityAuditEventV2> events = new ArrayList<>();
SortedMap<String, EntityAuditEventV2> subMap =
auditEventsV2.tailMap(entityId);
for (EntityAuditEventV2 event : subMap.values()) {
if (event.getEntityId().equals(entityId)) {
- events.add(event);
+ if (auditAction == null || event.getAction() == auditAction) {
+ if (event.getAction() ==
EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE &&
!createEventsAgeoutAllowed) {
+ continue;
+ }
+ events.add(event);
+ }
}
}
+
+ if (allowMaxResults && limit == -1) {
+ limit = (short) events.size();
+ }
EntityAuditEventV2.sortEvents(events, sortByColumn, sortOrderDesc);
- events = events.subList(
- Math.min(events.size(), offset),
- Math.min(events.size(), offset + limit));
+ int fromIndex = Math.min(events.size(), offset);
+ int endIndex = Math.min(events.size(), offset + limit);
+
+ List<EntityAuditEventV2> possibleExpiredEvents = events.subList(0,
fromIndex);
+
+ events = new ArrayList<>(events.subList(fromIndex, endIndex));
+
+ // This is only for Audit Aging, including expired audit events to
result
+ if (CollectionUtils.isNotEmpty(possibleExpiredEvents) && ttlInDays > 0
) {
+ LocalDateTime now = LocalDateTime.now();
+ long ttlTimestamp =
Timestamp.valueOf(now.minusDays(ttlInDays)).getTime();
+ possibleExpiredEvents.removeIf(e -> (auditAction!= null &&
e.getAction() != auditAction) || e.getTimestamp() > ttlTimestamp);
+ if (CollectionUtils.isNotEmpty(possibleExpiredEvents)) {
+ events.addAll(possibleExpiredEvents);
+ }
+ }
+ return events;
+ }
+
+ @Override
+ public List<EntityAuditEventV2> deleteEventsV2(String entityId,
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short
auditCount, int ttlInDays, boolean createEventsAgeoutAllowed,
AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
+ List<EntityAuditEventV2> events = new ArrayList<>();
+ if (CollectionUtils.isEmpty(entityAuditActions)) {
+ events = listEventsV2(entityId, null, "timestamp", true,
ttlInDays, auditCount, (short) -1, true, createEventsAgeoutAllowed);
+ } else {
+ for (EntityAuditEventV2.EntityAuditActionV2 auditAction :
entityAuditActions) {
+ List<EntityAuditEventV2> eventsByAction =
listEventsV2(entityId, auditAction, "timestamp", true, ttlInDays, auditCount,
(short) -1, true, createEventsAgeoutAllowed);
+ if (CollectionUtils.isNotEmpty(eventsByAction)) {
+ events.addAll(eventsByAction);
+ }
+ }
+ }
return events;
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
index c62bb7eaa..c357c0f59 100644
---
a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
+++
b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java
@@ -18,10 +18,12 @@
package org.apache.atlas.repository.audit;
+import org.apache.atlas.AtlasException;
import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.annotation.ConditionalOnAtlasProperty;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.springframework.stereotype.Component;
import javax.inject.Singleton;
@@ -67,6 +69,11 @@ public class NoopEntityAuditRepository implements
EntityAuditRepository {
return Collections.emptyList();
}
+ @Override
+ public List<EntityAuditEventV2> deleteEventsV2(String entityId,
Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions, short
auditCount, int ttlInDays, boolean createEventsAgeoutAllowed,
AtlasAuditAgingType auditAgingType) throws AtlasBaseException, AtlasException {
+ return null;
+ }
+
@Override
public List<EntityAuditEventV2> listEventsV2(String entityId,
EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short
maxResultCount) {
return Collections.emptyList();
diff --git
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 5780f2ea0..01aa22f69 100755
---
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -398,6 +398,9 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
// index recovery
createCommonVertexIndex(management,
PROPERTY_KEY_INDEX_RECOVERY_NAME, UniqueKind.GLOBAL_UNIQUE, String.class,
SINGLE, true, false);
+ // audit reduction
+ createCommonVertexIndex(management,
PROPERTY_KEY_AUDIT_REDUCTION_NAME, UniqueKind.GLOBAL_UNIQUE, String.class,
SINGLE, true, false);
+
//metrics
createCommonVertexIndex(management,"
__AtlasMetricsStat.metricsId", UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE,
true, false);
createCommonVertexIndex(management,"
__AtlasMetricsStat.__u_metricsId", UniqueKind.GLOBAL_UNIQUE, String.class,
SINGLE, true, false);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionEntityRetrievalTask.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionEntityRetrievalTask.java
new file mode 100644
index 000000000..e53416f6b
--- /dev/null
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionEntityRetrievalTask.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks;
+
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.Constants.*;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.tasks.AbstractTask;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
+import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
+import static org.apache.atlas.repository.Constants.*;
+import static
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
+import static
org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.AGING_TYPE_PROPERTY_KEY_MAP;
+import static
org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.ATLAS_AUDIT_REDUCTION;
+
+
+public class AuditReductionEntityRetrievalTask extends AbstractTask {
+ private static final Logger LOG =
LoggerFactory.getLogger(AuditReductionEntityRetrievalTask.class);
+
+ private static final String VALUE_DELIMITER = ",";
+
+ private final AtlasDiscoveryService discoveryService;
+ private final AtlasTypeRegistry typeRegistry;
+ private final AtlasGraph graph;
+
+ public AuditReductionEntityRetrievalTask(AtlasTask task, AtlasGraph graph,
AtlasDiscoveryService discoveryService, AtlasTypeRegistry typeRegistry) {
+ super(task);
+ this.graph = graph;
+ this.discoveryService = discoveryService;
+ this.typeRegistry = typeRegistry;
+ }
+
+ @Override
+ public AtlasTask.Status perform() throws Exception {
+ RequestContext.clear();
+ Map<String, Object> params = getTaskDef().getParameters();
+
+ if (MapUtils.isEmpty(params)) {
+ LOG.warn("Task: {}: Unable to process task: Parameters is not
readable!", getTaskGuid());
+ return FAILED;
+ }
+
+ String userName = getTaskDef().getCreatedBy();
+
+ if (StringUtils.isEmpty(userName)) {
+ LOG.warn("Task: {}: Unable to process task as user name is
empty!", getTaskGuid());
+
+ return FAILED;
+ }
+
+ RequestContext.get().setUser(userName, null);
+
+ try {
+ run(params);
+
+ setStatus(COMPLETE);
+ } catch (Exception e) {
+ LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
+
+ setStatus(FAILED);
+
+ throw e;
+ } finally {
+ RequestContext.clear();
+ }
+ return getStatus();
+ }
+
+ protected void run(Map<String, Object> parameters) throws
AtlasBaseException, IOException, AtlasException {
+ try {
+ AtlasTask auditAgingTask =
createAgingTaskWithEligibleGUIDs(parameters);
+ if (auditAgingTask != null) {
+ LOG.info("{} task created for audit aging type-{}",
ATLAS_AUDIT_REDUCTION, parameters.get(AUDIT_AGING_TYPE_KEY));
+ }
+ } catch (Exception e) {
+ LOG.error("Error while retrieving entities eligible for audit
aging and creating audit aging tasks", e.getMessage());
+ }
+ }
+
+ protected AtlasTask createAgingTaskWithEligibleGUIDs(Map<String, Object>
parameters) throws AtlasBaseException {
+ final String ALL_ENTITY_TYPES = "_ALL_ENTITY_TYPES";
+ final int SEARCH_OFFSET = 0;
+ final int SEARCH_LIMIT =
AtlasConfiguration.ATLAS_AUDIT_AGING_SEARCH_MAX_LIMIT.getInt();
+
+ Set<String> entityTypes = ((Collection<String>)
parameters.get(AUDIT_AGING_ENTITY_TYPES_KEY)).stream().collect(Collectors.toSet());
+ AtlasAuditAgingType auditAgingType =
(AtlasAuditAgingType)parameters.get(AUDIT_AGING_TYPE_KEY);
+ boolean subTypesIncluded =
(boolean)parameters.get(AUDIT_AGING_SUBTYPES_INCLUDED_KEY);
+
+ SearchParameters searchEntitiesToReduceAudit = new SearchParameters();
+ searchEntitiesToReduceAudit.setTypeName(ALL_ENTITY_TYPES);
+ searchEntitiesToReduceAudit.setOffset(SEARCH_OFFSET);
+ searchEntitiesToReduceAudit.setLimit(SEARCH_LIMIT);
+ searchEntitiesToReduceAudit.setIncludeSubTypes(subTypesIncluded);
+
+ if (CollectionUtils.isNotEmpty(entityTypes)) {
+ if (!validateTypesAndIncludeSubTypes(entityTypes, auditAgingType,
subTypesIncluded)) {
+ LOG.error("All entity type names provided for audit aging
type-{} are invalid", auditAgingType);
+ return null;
+ }
+
+ String queryString = String.join(VALUE_DELIMITER, entityTypes);
+ if (auditAgingType == AtlasAuditAgingType.DEFAULT &&
StringUtils.isNotEmpty(queryString)) {
+ queryString = new
StringBuilder().append("!").append(queryString).toString();
+ }
+ searchEntitiesToReduceAudit.setQuery(queryString);
+ }
+
+ LOG.info("Getting GUIDs eligible for Audit aging type-{} with
SearchParameters: {}", auditAgingType.toString(),
searchEntitiesToReduceAudit.toString());
+
+ Set<String> guids =
discoveryService.searchGUIDsWithParameters(auditAgingType, entityTypes,
searchEntitiesToReduceAudit);
+
+ AtlasVertex auditReductionVertex = getOrCreateVertex();
+
+ AtlasTask ageoutTask =
updateVertexWithGuidsAndCreateAgingTask(auditReductionVertex,
AGING_TYPE_PROPERTY_KEY_MAP.get(auditAgingType), guids, parameters);
+
+ /** For DEFAULT audit aging, "entityTypes" should be excluded from
_ALL_ENTITY_TYPES i.e., negating the queryString
+ * Including AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY to indicate the
same in AtlasTask response to user
+ */
+ if (ageoutTask != null) {
+ if (auditAgingType == AtlasAuditAgingType.DEFAULT &&
CollectionUtils.isNotEmpty(entityTypes)) {
+
ageoutTask.getParameters().put(AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY, true);
+ } else {
+
ageoutTask.getParameters().put(AUDIT_AGING_EXCLUDE_ENTITY_TYPES_KEY, false);
+ }
+ }
+
+ return ageoutTask;
+ }
+
+ private boolean validateTypesAndIncludeSubTypes(Set<String> entityTypes,
AtlasAuditAgingType auditAgingType, boolean subTypesIncluded) throws
AtlasBaseException {
+ Collection<String> allEntityTypeNames =
typeRegistry.getAllEntityDefNames();
+ Set<String> entityTypesToSearch = new HashSet<>();
+ Set<String> invalidEntityTypeNames = new HashSet<>();
+
+ entityTypes.stream().forEach(entityType -> {
+ if (entityType.endsWith("*")) {
+ String suffix = entityType.replace("*", "");
+
entityTypesToSearch.addAll(allEntityTypeNames.stream().filter(e ->
e.startsWith(suffix)).collect(Collectors.toSet()));
+ } else if (allEntityTypeNames.contains(entityType)) {
+ entityTypesToSearch.add(entityType);
+ } else {
+ invalidEntityTypeNames.add(entityType);
+ }
+ });
+
+ if (auditAgingType != AtlasAuditAgingType.DEFAULT) {
+ if (CollectionUtils.isNotEmpty(invalidEntityTypeNames)) {
+ LOG.warn("Invalid entity type name(s) {} provided for aging
type-{}", String.join(VALUE_DELIMITER, invalidEntityTypeNames), auditAgingType);
+ }
+
+ if (CollectionUtils.isEmpty(entityTypesToSearch)) {
+ return false;
+ }
+ }
+
+ entityTypes.clear();
+ entityTypes.addAll(subTypesIncluded ?
AtlasEntityType.getEntityTypesAndAllSubTypes(entityTypesToSearch, typeRegistry)
: entityTypesToSearch);
+
+ return true;
+ }
+
+ @GraphTransaction
+ private AtlasTask updateVertexWithGuidsAndCreateAgingTask(AtlasVertex
vertex, String vertexProperty, Set<String> guids, Map<String, Object> params)
throws AtlasBaseException {
+ List<String> guidsEligibleForAuditReduction =
vertex.getProperty(vertexProperty, List.class);
+ if (CollectionUtils.isEmpty(guidsEligibleForAuditReduction) &&
CollectionUtils.isEmpty(guids)) {
+ return null;
+ }
+
+ if (CollectionUtils.isEmpty(guidsEligibleForAuditReduction)) {
+ guidsEligibleForAuditReduction = new ArrayList<>();
+ }
+
+ if (CollectionUtils.isNotEmpty(guids)) {
+ guidsEligibleForAuditReduction.addAll(guids);
+ setEncodedProperty(vertex, vertexProperty,
guidsEligibleForAuditReduction);
+ }
+
+ return discoveryService.createAndQueueAuditReductionTask(params,
ATLAS_AUDIT_REDUCTION);
+ }
+
+ private AtlasVertex getOrCreateVertex() {
+
+ AtlasGraphQuery query =
graph.query().has(PROPERTY_KEY_AUDIT_REDUCTION_NAME, AUDIT_REDUCTION_TYPE_NAME);
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ AtlasVertex auditReductionVertex = results.hasNext() ? results.next()
: null;
+
+ if (auditReductionVertex == null) {
+ auditReductionVertex = graph.addVertex();
+ setEncodedProperty(auditReductionVertex,
PROPERTY_KEY_AUDIT_REDUCTION_NAME, AUDIT_REDUCTION_TYPE_NAME);
+ }
+ return auditReductionVertex;
+ }
+
+}
\ No newline at end of file
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTask.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTask.java
new file mode 100644
index 000000000..e8d7f6806
--- /dev/null
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTask.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.audit.EntityAuditEventV2;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.tasks.AbstractTask;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static org.apache.atlas.model.tasks.AtlasTask.Status.COMPLETE;
+import static org.apache.atlas.model.tasks.AtlasTask.Status.FAILED;
+import static org.apache.atlas.repository.Constants.*;
+
+import static
org.apache.atlas.repository.store.graph.v2.tasks.AuditReductionTaskFactory.AGING_TYPE_PROPERTY_KEY_MAP;
+
+
+public class AuditReductionTask extends AbstractTask {
+ private static final Logger LOG =
LoggerFactory.getLogger(AuditReductionTask.class);
+
+ private static final int GUID_BATCH_SIZE_PER_AGE_OUT_TASK = 100;
+
+ private final EntityAuditRepository auditRepository;
+ private final AtlasGraph graph;
+
+ public AuditReductionTask(AtlasTask task, EntityAuditRepository
auditRepository, AtlasGraph graph) {
+ super(task);
+ this.auditRepository = auditRepository;
+ this.graph = graph;
+ }
+
+ @Override
+ public AtlasTask.Status perform() throws Exception {
+ RequestContext.clear();
+ Map<String, Object> params = getTaskDef().getParameters();
+
+ if (MapUtils.isEmpty(params)) {
+ LOG.warn("Task: {}: Unable to process task: Parameters is not
readable!", getTaskGuid());
+ return FAILED;
+ }
+
+ String userName = getTaskDef().getCreatedBy();
+
+ if (StringUtils.isEmpty(userName)) {
+ LOG.warn("Task: {}: Unable to process task as user name is
empty!", getTaskGuid());
+
+ return FAILED;
+ }
+
+ RequestContext.get().setUser(userName, null);
+
+ try {
+ run(params);
+
+ setStatus(COMPLETE);
+ } catch (Exception e) {
+ LOG.error("Task: {}: Error performing task!", getTaskGuid(), e);
+
+ setStatus(FAILED);
+
+ throw e;
+ } finally {
+ RequestContext.clear();
+ }
+ return getStatus();
+ }
+
+ protected void run(Map<String, Object> parameters) throws
AtlasBaseException, IOException, AtlasException {
+ AtlasVertex vertex = findVertex();
+
+ if (vertex == null) {
+ return;
+ }
+
+ Map<String, List<EntityAuditEventV2>> entitiesWithSucceededAgeout =
new HashMap<>();
+
+ AtlasAuditAgingType auditAgingType =
AtlasAuditAgingType.valueOf(String.valueOf(parameters.get(AUDIT_AGING_TYPE_KEY)));
+ Set<String> actionTypes =
((Collection<String>)
parameters.get(AUDIT_AGING_ACTION_TYPES_KEY)).stream().collect(Collectors.toSet());
+ int auditCountInput = (int)
parameters.get(AUDIT_AGING_COUNT_KEY);
+ short auditCount = auditCountInput >
Short.MAX_VALUE ? Short.MAX_VALUE : auditCountInput < Short.MIN_VALUE ?
Short.MIN_VALUE : (short)auditCountInput;
+ int ttl = (int)
parameters.get(AUDIT_AGING_TTL_KEY);
+ boolean createEventsAgeoutAllowed = (boolean)
parameters.get(CREATE_EVENTS_AGEOUT_ALLOWED_KEY);
+ String vertexPropertyKeyForGuids =
AGING_TYPE_PROPERTY_KEY_MAP.get(auditAgingType);
+
+ List<String> entityGuidsEligibleForAuditAgeout =
vertex.getProperty(vertexPropertyKeyForGuids, List.class);
+ int guidsCount =
CollectionUtils.isNotEmpty(entityGuidsEligibleForAuditAgeout) ?
entityGuidsEligibleForAuditAgeout.size() : 0;
+ int batchIndex = 1;
+
+ Set<EntityAuditEventV2.EntityAuditActionV2> entityAuditActions =
actionTypes.stream().map(x ->
EntityAuditEventV2.EntityAuditActionV2.fromString(x)).collect(Collectors.toSet());
+
+ for (int startIndex = 0; startIndex < guidsCount; ) {
+ int endIndex = startIndex +
GUID_BATCH_SIZE_PER_AGE_OUT_TASK < guidsCount ? startIndex +
GUID_BATCH_SIZE_PER_AGE_OUT_TASK : guidsCount;
+ List<String> guidsBatch =
entityGuidsEligibleForAuditAgeout.subList(startIndex, endIndex);
+
+ for (String guid : guidsBatch) {
+ List<EntityAuditEventV2> deletedAuditEvents =
auditRepository.deleteEventsV2(guid, entityAuditActions, auditCount, ttl,
createEventsAgeoutAllowed, auditAgingType);
+ entitiesWithSucceededAgeout.put(guid, deletedAuditEvents);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} Audit aging completed for batch-{} with guids:
{}", auditAgingType.toString(), batchIndex,
Arrays.toString(entitiesWithSucceededAgeout.keySet().toArray()));
+ }
+
+ entitiesWithSucceededAgeout.clear();
+ startIndex = endIndex;
+ batchIndex++;
+ List<String> remainingGuids = startIndex < guidsCount ? new
ArrayList<>(entityGuidsEligibleForAuditAgeout.subList(startIndex, guidsCount))
: null;
+ vertex.setProperty(vertexPropertyKeyForGuids, remainingGuids);
+ }
+
+
+ }
+
+ public AtlasVertex findVertex() {
+ AtlasGraphQuery query =
graph.query().has(PROPERTY_KEY_AUDIT_REDUCTION_NAME, AUDIT_REDUCTION_TYPE_NAME);
+ Iterator<AtlasVertex> results = query.vertices().iterator();
+
+ return results.hasNext() ? results.next() : null;
+ }
+}
\ No newline at end of file
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTaskFactory.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTaskFactory.java
new file mode 100644
index 000000000..7e6aca1ab
--- /dev/null
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/tasks/AuditReductionTaskFactory.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.tasks;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.discovery.AtlasDiscoveryService;
+import org.apache.atlas.model.tasks.AtlasTask;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
+import org.apache.atlas.repository.audit.EntityAuditRepository;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.tasks.AbstractTask;
+import org.apache.atlas.tasks.TaskFactory;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.*;
+
+import static
org.apache.atlas.repository.Constants.PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_DEFAULT;
+import static
org.apache.atlas.repository.Constants.PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM;
+import static
org.apache.atlas.repository.Constants.PROPERTY_KEY_GUIDS_TO_SWEEPOUT;
+
+
+@Component
+public class AuditReductionTaskFactory implements TaskFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(AuditReductionTaskFactory.class);
+
+ private static Configuration configuration;
+ public static final int MAX_PENDING_TASKS_ALLOWED;
+ private static final int MAX_PENDING_TASKS_ALLOWED_DEFAULT = 50;
+ private static final String MAX_PENDING_TASKS_ALLOWED_KEY =
"atlas.audit.reduction.max.pending.tasks";
+ public static final String ATLAS_AUDIT_REDUCTION =
"ATLAS_AUDIT_REDUCTION";
+ public static final String ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL =
"AUDIT_REDUCTION_ENTITY_RETRIEVAL";
+
+ static {
+ try {
+ configuration = ApplicationProperties.get();
+ } catch (Exception e) {
+ LOG.info("Failed to load application properties", e);
+ }
+ if (configuration != null) {
+ MAX_PENDING_TASKS_ALLOWED =
configuration.getInt(MAX_PENDING_TASKS_ALLOWED_KEY,
MAX_PENDING_TASKS_ALLOWED_DEFAULT);
+ } else {
+ MAX_PENDING_TASKS_ALLOWED = MAX_PENDING_TASKS_ALLOWED_DEFAULT;
+ }
+ }
+
+ private static final List<String> supportedTypes = new ArrayList<String>()
{{
+ add(ATLAS_AUDIT_REDUCTION);
+ add(ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL);
+ }};
+
+ public static final Map<AtlasAuditAgingType, String>
AGING_TYPE_PROPERTY_KEY_MAP = new HashMap<AtlasAuditAgingType, String>() {
+ {
+ put(AtlasAuditAgingType.DEFAULT,
PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_DEFAULT);
+ put(AtlasAuditAgingType.SWEEP, PROPERTY_KEY_GUIDS_TO_SWEEPOUT);
+ put(AtlasAuditAgingType.CUSTOM,
PROPERTY_KEY_GUIDS_TO_AGEOUT_BY_CUSTOM);
+ }
+ };
+
+ private final EntityAuditRepository auditRepository;
+ private final AtlasGraph graph;
+ private final AtlasDiscoveryService discoveryService;
+ private final AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ public AuditReductionTaskFactory(EntityAuditRepository auditRepository,
AtlasGraph graph, AtlasDiscoveryService discoveryService, AtlasTypeRegistry
typeRegistry) {
+ this.auditRepository = auditRepository;
+ this.graph = graph;
+ this.discoveryService = discoveryService;
+ this.typeRegistry = typeRegistry;
+ }
+
+ @Override
+ public AbstractTask create(AtlasTask task) {
+ String taskType = task.getType();
+ String taskGuid = task.getGuid();
+ switch (taskType) {
+ case ATLAS_AUDIT_REDUCTION:
+ return new AuditReductionTask(task, auditRepository, graph);
+
+ case ATLAS_AUDIT_REDUCTION_ENTITY_RETRIEVAL:
+ return new AuditReductionEntityRetrievalTask(task, graph,
discoveryService, typeRegistry);
+
+ default:
+ LOG.warn("Type: {} - {} not found!. The task will be
ignored.", taskType, taskGuid);
+ return null;
+ }
+ }
+
+ @Override
+ public List<String> getSupportedTypes() {
+ return this.supportedTypes;
+ }
+}
\ No newline at end of file
diff --git
a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
index 679df3c5c..24a8bda23 100644
---
a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
+++
b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java
@@ -22,15 +22,20 @@ import org.apache.atlas.EntityAuditEvent;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.repository.Constants.AtlasAuditAgingType;
import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.commons.lang.time.DateUtils;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.testng.Assert.*;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
public class AuditRepositoryTestBase {
protected EntityAuditRepository eventRepository;
@@ -203,4 +208,62 @@ public class AuditRepositoryTestBase {
assertEquals(actual.getDetails(), expected.getDetails());
}
+ @Test
+ public void testDeleteEventsV2() throws Exception {
+ String id1 = "id1" + rand();
+ int ttlInDays = 1;
+ long ts = System.currentTimeMillis() - (ttlInDays *
DateUtils.MILLIS_PER_DAY);
+ AtlasEntity entity = new AtlasEntity(rand());
+
+ int j = 0;
+ List<EntityAuditEventV2> expectedEvents = new ArrayList<>();
+ expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user-a",
EntityAuditEventV2.EntityAuditActionV2.ENTITY_CREATE, "details" + j, entity));
+ expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user-C",
EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE, "details" + j, entity));
+ for (int i = 0; i < 5; i++) {
+ expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user" +
i, EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE, "details" + j,
entity));
+ expectedEvents.add(new EntityAuditEventV2(id1, ts + j++, "user" +
i, EntityAuditEventV2.EntityAuditActionV2.CLASSIFICATION_ADD, "details" + j,
entity));
+ }
+ expectedEvents.add(new EntityAuditEventV2(id1, ts+ j++, "User-b",
EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE, "details" + j, entity));
+ for(EntityAuditEventV2 event : expectedEvents) {
+ eventRepository.putEventsV2(event);
+ }
+
+ List<EntityAuditEventV2> events = eventRepository.listEventsV2(id1,
null, "timestamp", false, 0, (short) -1);
+ assertEquals(events.size(), 13);
+ assertEventV2Equals(events.get(0), expectedEvents.get(0));
+ assertEventV2Equals(events.get(1), expectedEvents.get(1));
+
+ short expectedUpdateEventsCount = 2;
+ List<EntityAuditEventV2> deletedUpdateEvents =
eventRepository.deleteEventsV2(id1, new
HashSet<>(Arrays.asList(EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE)),
expectedUpdateEventsCount , 0, false, AtlasAuditAgingType.CUSTOM);
+ List<EntityAuditEventV2> remainingEvents =
events.stream().filter(x ->
!deletedUpdateEvents.contains(x)).collect(Collectors.toList());
+ List<EntityAuditEventV2> remainingUpdateEvents =
remainingEvents.stream().filter(x -> x.getAction() ==
EntityAuditEventV2.EntityAuditActionV2.ENTITY_UPDATE).collect(Collectors.toList());
+
+ assertEquals(remainingUpdateEvents.size(), expectedUpdateEventsCount);
+
+ short expectedEventsCount = 4;
+ List<EntityAuditEventV2> deletedEvents =
eventRepository.deleteEventsV2(id1, null, expectedEventsCount , 0, false,
AtlasAuditAgingType.DEFAULT);
+ remainingEvents = events.stream().filter(x ->
!deletedEvents.contains(x)).collect(Collectors.toList());
+
+ assertEquals(remainingEvents.size(), expectedEventsCount + 1);
+ assertEventV2Equals(remainingEvents.get(0), events.get(0));
+ assertTrue(remainingEvents.contains(events.get(0)));
+
+ List<EntityAuditEventV2> deletedEventsIncludingCreate =
eventRepository.deleteEventsV2(id1, null, expectedEventsCount , 0, true,
AtlasAuditAgingType.DEFAULT);
+ remainingEvents = events.stream().filter(x ->
!deletedEventsIncludingCreate.contains(x)).collect(Collectors.toList());
+
+ assertEquals(remainingEvents.size(), expectedEventsCount);
+ assertNotEquals(remainingEvents.get(3), events.get(0));
+ assertFalse(remainingEvents.contains(events.get(0)));
+
+ EntityAuditEventV2 latestEvent = new EntityAuditEventV2(id1,
System.currentTimeMillis(), "User-b",
EntityAuditEventV2.EntityAuditActionV2.ENTITY_DELETE, "details" + j++, entity);
+ eventRepository.putEventsV2(latestEvent);
+ List<EntityAuditEventV2> allEvents = eventRepository.listEventsV2(id1,
null, "timestamp", false, 0, (short) -1);
+
+ List<EntityAuditEventV2> deletedEventsByTTL =
eventRepository.deleteEventsV2(id1, null, expectedUpdateEventsCount ,
ttlInDays, true, AtlasAuditAgingType.DEFAULT);
+ assertEquals(deletedEventsByTTL.size(), allEvents.size() - 1);
+
+ List<EntityAuditEventV2> remainingEventsByTTL =
allEvents.stream().filter(x ->
!deletedEventsByTTL.contains(x)).collect(Collectors.toList());
+ assertEquals(remainingEventsByTTL.size(), 1);
+ assertEquals(latestEvent, remainingEventsByTTL.get(0));
+ }
}
diff --git a/test-tools/src/main/resources/solr/core-template/solrconfig.xml
b/test-tools/src/main/resources/solr/core-template/solrconfig.xml
index 1550052b4..7a0e8dd16 100644
--- a/test-tools/src/main/resources/solr/core-template/solrconfig.xml
+++ b/test-tools/src/main/resources/solr/core-template/solrconfig.xml
@@ -445,7 +445,7 @@
-->
<lst name="defaults">
<str name="defType">edismax</str>
- <str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l
iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_l 15vp_t
1891_t 19tx_t 1bet_t 1czp_t 1ekl_t 1gxx_t 1iit_l 1k3p_t 1lol_t 1o1x_t 1qf9_t
1ssl_t 1udh_t 1wqt_t 4eth_t 4rgl_s 4pvp_s 4nid_s 4lxh_s 4p39_t 4wzp_t 4t1h_t
4umd_l 4vet_t 51qd_t 505h_t 53b9_t 5af9_t 5fyd_t 5kp1_t 5ibp_t 5j45_t 5hj9_t
5r0l_t 5q85_t 5y4l_l 5zph_l 5rt1_t 5xc5_t 68ed_l 69z9_l 66th_t 658l_t 6bk5_t
61ad_t 622t_t 63np_t 6ltx_t 6mmd_ [...]
+ <str name="qf">35x_t 5j9_t 7wl_t a9x_t but_t dfp_l f0l_t i6d_l
iyt_l jr9_t kjp_s lc5_t m4l_s mx1_t ohx_t xz9_i 1151_t 12px_t 14at_l 15vp_t
1891_t 19tx_t 1bet_t 1czp_t 1ekl_t 1gxx_t 1iit_l 1k3p_t 1lol_t 1o1x_t 1qf9_t
1ssl_t 1v5x_t 1wqt_t 1z45_t 4h6t_t 4ttx_s 4s91_s 4pvp_s 4oat_s 4rgl_t 4zd1_t
4vet_t 4wzp_l 4xs5_t 543p_t 52it_t 55ol_t 5csl_t 5ibp_t 5n2d_t 5kp1_t 5lhh_t
5jwl_t 5tdx_t 5slh_t 60hx_l 622t_l 5u6d_t 5zph_t 6arp_l 6ccl_l 696t_t 67lx_t
6dxh_t 63np_t 64g5_t 6611_t 6o79_ [...]
<str name="hl.fl">*</str>
<bool name="hl.requireFieldMatch">true</bool>
<bool name="lowercaseOperators">true</bool>
diff --git
a/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java
b/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java
index a25a51b5a..e19beccb1 100644
--- a/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/ActiveServerFilter.java
@@ -111,7 +111,7 @@ public class ActiveServerFilter implements Filter {
final String adminUriNotFiltered[] = { "/admin/export", "/admin/import",
"/admin/importfile", "/admin/audits",
"/admin/purge", "/admin/expimp/audit", "/admin/metrics",
"/admin/server", "/admin/audit/", "admin/tasks",
- "/admin/debug/metrics"};
+ "/admin/debug/metrics", "/admin/audits/ageout"};
private boolean isFilteredURI(ServletRequest servletRequest) {
HttpServletRequest httpServletRequest = (HttpServletRequest)
servletRequest;
String requestURI = httpServletRequest.getRequestURI();
diff --git
a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index b19095b48..4d59fa3d6 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -31,6 +31,7 @@ import org.apache.atlas.discovery.SearchContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.AtlasAuditEntry;
import org.apache.atlas.model.audit.AtlasAuditEntry.AuditOperation;
+import org.apache.atlas.model.audit.AuditReductionCriteria;
import org.apache.atlas.model.audit.AuditSearchParameters;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2;
@@ -52,6 +53,7 @@ import org.apache.atlas.model.metrics.AtlasMetricsStat;
import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches;
import org.apache.atlas.model.tasks.AtlasTask;
import org.apache.atlas.repository.audit.AtlasAuditService;
+import org.apache.atlas.repository.audit.AtlasAuditReductionService;
import org.apache.atlas.repository.audit.EntityAuditRepository;
import org.apache.atlas.repository.impexp.AtlasServerService;
import org.apache.atlas.repository.impexp.ExportImportAuditService;
@@ -187,6 +189,8 @@ public class AdminResource {
private final boolean isOnDemandLineageEnabled;
private final int defaultLineageNodeCount;
+ private AtlasAuditReductionService auditReductionService;
+
static {
try {
atlasProperties = ApplicationProperties.get();
@@ -202,7 +206,7 @@ public class AdminResource {
AtlasServerService serverService,
ExportImportAuditService exportImportAuditService,
AtlasEntityStore entityStore,
AtlasPatchManager patchManager, AtlasAuditService
auditService, EntityAuditRepository auditRepository,
- TaskManagement taskManagement, AtlasDebugMetricsSink
debugMetricsRESTSink) {
+ TaskManagement taskManagement, AtlasDebugMetricsSink
debugMetricsRESTSink, AtlasAuditReductionService atlasAuditReductionService) {
this.serviceState = serviceState;
this.metricsService = metricsService;
this.exportService = exportService;
@@ -219,6 +223,7 @@ public class AdminResource {
this.auditRepository = auditRepository;
this.taskManagement = taskManagement;
this.debugMetricsRESTSink = debugMetricsRESTSink;
+ this.auditReductionService = atlasAuditReductionService;
if (atlasProperties != null) {
this.defaultUIVersion =
atlasProperties.getString(DEFAULT_UI_VERSION, UI_VERSION_V2);
@@ -820,6 +825,45 @@ public class AdminResource {
}
}
+ @POST
+ @Path("/audits/ageout")
+ @Consumes(Servlets.JSON_MEDIA_TYPE)
+ @Produces(Servlets.JSON_MEDIA_TYPE)
+ public List<AtlasTask> ageoutAuditData(AuditReductionCriteria
auditReductionCriteria, @QueryParam("useAuditConfig") @DefaultValue("false")
Boolean useAuditConfig) throws AtlasBaseException {
+ AtlasPerfTracer perf = null;
+ try {
+ AtlasAuthorizationUtils.verifyAccess(new
AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_AUDITS), "Admin Audits Ageout");
+
+ if (useAuditConfig) {
+ return auditReductionService.startAuditAgingByConfig();
+ }
+
+ if (!auditReductionCriteria.isAuditAgingEnabled()) {
+ LOG.warn("Audit aging should be enabled");
+ return null;
+ }
+
+ if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+ perf = AtlasPerfTracer.getPerfTracer(PERF_LOG,
"AdminResource.ageoutAuditData(" + auditReductionCriteria + ")");
+ }
+
+ updateCriteriaWithDefaultValues(auditReductionCriteria);
+
+ List<Map<String, Object>> ageoutTypeCriteriaMap =
auditReductionService.buildAgeoutCriteriaForAllAgingTypes(auditReductionCriteria);
+
+ return
auditReductionService.startAuditAgingByCriteria(ageoutTypeCriteriaMap);
+
+ } finally {
+ AtlasPerfTracer.log(perf);
+ }
+ }
+
+ private void updateCriteriaWithDefaultValues(AuditReductionCriteria
auditReductionCriteria) {
+ if (auditReductionCriteria.getDefaultAgeoutTTLInDays() <= 0) {
+
auditReductionCriteria.setDefaultAgeoutTTLInDays(AtlasConfiguration.ATLAS_AUDIT_DEFAULT_AGEOUT_TTL.getInt());
+ }
+ }
+
@POST
@Path("/audits")
@Consumes(Servlets.JSON_MEDIA_TYPE)
diff --git
a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
index a4d794615..5b16ba149 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
@@ -51,7 +51,7 @@ public class AdminResourceTest {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
- AdminResource adminResource = new AdminResource(serviceState, null,
null, null, null, null, null, null, null, null, null, null, null, null, null);
+ AdminResource adminResource = new AdminResource(serviceState, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null);
Response response = adminResource.getStatus();
assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
JsonNode entity = AtlasJson.parseToV1JsonNode((String)
response.getEntity());
@@ -62,7 +62,7 @@ public class AdminResourceTest {
public void testResourceGetsValueFromServiceState() throws IOException {
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
- AdminResource adminResource = new AdminResource(serviceState, null,
null, null, null, null, null, null, null, null, null, null, null, null, null);
+ AdminResource adminResource = new AdminResource(serviceState, null,
null, null, null, null, null, null, null, null, null, null, null, null, null,
null);
Response response = adminResource.getStatus();
verify(serviceState).getState();