This is an automated email from the ASF dual-hosted git repository.
radhikakundam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new f3ef15afc ATLAS-4830: Make ignore patterns generic for all the hooks
f3ef15afc is described below
commit f3ef15afc9aa336c27aeeb427e5f2078b233d364
Author: Amruth S <[email protected]>
AuthorDate: Fri Mar 15 17:40:59 2024 +0530
ATLAS-4830: Make ignore patterns generic for all the hooks
Signed-off-by: Radhika Kundam <[email protected]>
---
.../java/org/apache/atlas/hive/hook/HiveHook.java | 15 --
.../main/java/org/apache/atlas/hook/AtlasHook.java | 85 ++++++++++-
.../notification/NotificationHookConsumer.java | 66 +++++++++
.../preprocessor/GenericEntityPreprocessor.java | 61 ++++++++
.../GenericEntityPreprocessorTest.java | 161 +++++++++++++++++++++
5 files changed, 371 insertions(+), 17 deletions(-)
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 4d74d0c48..96b77e187 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -336,21 +336,6 @@ public class HiveHook extends AtlasHook implements
ExecuteWithHookContext {
return ret;
}
-
- private boolean isMatch(String name, List<Pattern> patterns) {
- boolean ret = false;
-
- for (Pattern p : patterns) {
- if (p.matcher(name).matches()) {
- ret = true;
-
- break;
- }
- }
-
- return ret;
- }
-
public static HiveHookObjectNamesCache getKnownObjects() {
if (knownObjects != null && knownObjects.isCacheExpired()) {
LOG.info("HiveHook.run(): purging cached databaseNames ({}) and
tableNames ({})", knownObjects.getCachedDbCount(),
knownObjects.getCachedTableCount());
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 4c70aedb9..980d4feec 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -24,10 +24,12 @@ import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasConstants;
import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.utils.AtlasConfigurationUtil;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.atlas.model.notification.MessageSource;
import org.apache.commons.lang.StringUtils;
@@ -38,11 +40,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
/**
@@ -65,6 +71,8 @@ public abstract class AtlasHook {
public static final String CLUSTER_NAME_KEY
= "atlas.cluster.name";
public static final String DEFAULT_CLUSTER_NAME
= "primary";
public static final String CONF_ATLAS_HOOK_MESSAGES_SORT_ENABLED
= "atlas.hook.messages.sort.enabled";
+ public static final String ATLAS_HOOK_ENTITY_IGNORE_PATTERN
= "atlas.hook.entity.ignore.pattern";
+ public static final String ATTRIBUTE_QUALIFIED_NAME
= "qualifiedName";
protected static Configuration atlasProperties;
protected static NotificationInterface notificationInterface;
@@ -79,6 +87,8 @@ public abstract class AtlasHook {
private static ExecutorService executor = null;
public static final boolean isRESTNotificationEnabled;
public static final boolean isHookMsgsSortEnabled;
+ private static final List<Pattern> entitiesToIgnore = new ArrayList<>();
+ private static boolean shouldPreprocess = false;
static {
@@ -106,6 +116,23 @@ public abstract class AtlasHook {
notificationRetryInterval =
atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
notificationInterface = NotificationProvider.get();
+ String[] patternsToIgnoreEntities =
atlasProperties.getStringArray(ATLAS_HOOK_ENTITY_IGNORE_PATTERN);
+
+ if (patternsToIgnoreEntities != null) {
+ for (String pattern: patternsToIgnoreEntities) {
+ try {
+ entitiesToIgnore.add(Pattern.compile(pattern));
+ } catch (Throwable t) {
+ LOG.warn("failed to compile pattern {}", pattern, t);
+ LOG.warn("Ignoring invalid pattern in configuration {}:
{}", ATLAS_HOOK_ENTITY_IGNORE_PATTERN, pattern);
+ }
+ }
+ LOG.info("{}={}", ATLAS_HOOK_ENTITY_IGNORE_PATTERN,
entitiesToIgnore);
+ }
+
+
+ shouldPreprocess = CollectionUtils.isNotEmpty(entitiesToIgnore);
+
String currentUser = "";
try {
@@ -163,6 +190,60 @@ public abstract class AtlasHook {
public abstract String getMessageSource();
+ protected static boolean isMatch(String qualifiedName, List<Pattern>
patterns) {
+ return patterns.stream().anyMatch((Pattern pattern) ->
pattern.matcher(qualifiedName).matches());
+ }
+
+ private static AtlasEntity.AtlasEntitiesWithExtInfo
getAtlasEntitiesWithExtInfo(HookNotification hookNotification) {
+ AtlasEntity.AtlasEntitiesWithExtInfo entitiesWithExtInfo = null;
+ switch (hookNotification.getType()) {
+ case ENTITY_CREATE_V2:
+ entitiesWithExtInfo =
((HookNotification.EntityCreateRequestV2) hookNotification).getEntities();
+ break;
+ case ENTITY_FULL_UPDATE_V2:
+ entitiesWithExtInfo =
((HookNotification.EntityUpdateRequestV2) hookNotification).getEntities();
+ break;
+ }
+ return entitiesWithExtInfo;
+
+ }
+
+ private static void preprocessEntities(List<HookNotification>
hookNotifications) {
+ for (int i = 0; i < hookNotifications.size(); i++) {
+ HookNotification hookNotification = hookNotifications.get(i);
+
+ AtlasEntity.AtlasEntitiesWithExtInfo entitiesWithExtInfo =
getAtlasEntitiesWithExtInfo(hookNotification);
+
+ if (entitiesWithExtInfo == null) {
+ return;
+ }
+
+ List<AtlasEntity> entities = entitiesWithExtInfo.getEntities();
+ entities = ((entities != null) ? entities :
Collections.emptyList());
+ entities.removeIf((AtlasEntity entity) ->
isMatch(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString(),
entitiesToIgnore));
+
+
+ Map<String, AtlasEntity> referredEntitiesMap =
entitiesWithExtInfo.getReferredEntities();
+ referredEntitiesMap = ((referredEntitiesMap != null) ?
referredEntitiesMap: Collections.emptyMap());
+ referredEntitiesMap.entrySet().removeIf((Map.Entry<String,
AtlasEntity> entry) ->
isMatch(entry.getValue().getAttribute(ATTRIBUTE_QUALIFIED_NAME).toString(),
entitiesToIgnore));
+
+
+ if (CollectionUtils.isEmpty(entities) &&
CollectionUtils.isEmpty(referredEntitiesMap.values())) {
+ hookNotifications.remove(i--);
+
+ LOG.info("ignored message: {}", hookNotification);
+ }
+ }
+ }
+ private static void notifyEntitiesPostPreprocess(List<HookNotification>
messages, UserGroupInformation ugi, int maxRetries, MessageSource source) {
+ if (shouldPreprocess) {
+ preprocessEntities(messages);
+ }
+ if (CollectionUtils.isNotEmpty(messages)) {
+ notifyEntitiesInternal(messages, maxRetries, ugi,
notificationInterface, logFailedMessages, failedMessagesLogger, source);
+ }
+ }
+
/**
* Notify atlas of the entity through message. The entity can be a
* complex entity with reference to other entities.
@@ -174,12 +255,12 @@ public abstract class AtlasHook {
*/
public static void notifyEntities(List<HookNotification> messages,
UserGroupInformation ugi, int maxRetries, MessageSource source) {
if (executor == null) { // send synchronously
- notifyEntitiesInternal(messages, maxRetries, ugi,
notificationInterface, logFailedMessages, failedMessagesLogger, source);
+ notifyEntitiesPostPreprocess(messages, ugi, maxRetries, source);
} else {
executor.submit(new Runnable() {
@Override
public void run() {
- notifyEntitiesInternal(messages, maxRetries, ugi,
notificationInterface, logFailedMessages, failedMessagesLogger, source);
+ notifyEntitiesPostPreprocess(messages, ugi, maxRetries,
source);
}
});
}
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 936423aa2..7b02ac449 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -39,6 +39,7 @@ import
org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV
import
org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
import org.apache.atlas.notification.NotificationInterface.NotificationType;
import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
+import org.apache.atlas.notification.preprocessor.GenericEntityPreprocessor;
import org.apache.atlas.notification.preprocessor.PreprocessorContext;
import
org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
import org.apache.atlas.repository.store.graph.EntityCorrelationStore;
@@ -149,6 +150,8 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633
=
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
public static final String
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD =
"atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
+ public static final String CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN
=
"atlas.notification.consumer.preprocess.entity.type.ignore.pattern";
+ public static final String CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN
= "atlas.notification.consumer.preprocess.entity.ignore.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN
=
"atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN
=
"atlas.notification.consumer.preprocess.hive_table.prune.pattern";
public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE
= "atlas.notification.consumer.preprocess.hive_table.cache.size";
@@ -182,6 +185,8 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
private final boolean
updateHiveProcessNameWithQualifiedName;
private final int
largeMessageProcessingTimeThresholdMs;
private final boolean consumerDisabled;
+ private final List<Pattern> entityTypesToIgnore = new
ArrayList<>();
+ private final List<Pattern> entitiesToIgnore = new
ArrayList<>();
private final List<Pattern> hiveTablesToIgnore = new
ArrayList<>();
private final List<Pattern> hiveTablesToPrune = new
ArrayList<>();
private final List<String> hiveDummyDatabasesToIgnore;
@@ -246,9 +251,36 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
authnCache = (authorizeUsingMessageUser && authnCacheTtlSeconds > 0) ?
new PassiveExpiringMap<>(authnCacheTtlSeconds * 1000) : null;
+ String[] patternEntityTypesToIgnore =
applicationProperties.getStringArray(CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN);
+ String[] patternEntitiesToIgnore =
applicationProperties.getStringArray(CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN);
+
String[] patternHiveTablesToIgnore =
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
String[] patternHiveTablesToPrune =
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
+ if (patternEntityTypesToIgnore != null) {
+ for (String pattern: patternEntityTypesToIgnore) {
+ try {
+ this.entityTypesToIgnore.add(Pattern.compile(pattern));
+ } catch (Throwable t) {
+ LOG.warn("failed to compile pattern {}", pattern, t);
+ LOG.warn("Ignoring invalid pattern in configuration {}:
{}", CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN, pattern);
+ }
+ }
+ LOG.info("{}={}", CONSUMER_PREPROCESS_ENTITY_TYPE_IGNORE_PATTERN,
entityTypesToIgnore);
+ }
+
+ if (patternEntitiesToIgnore != null) {
+ for (String pattern: patternEntitiesToIgnore) {
+ try {
+ this.entitiesToIgnore.add(Pattern.compile(pattern));
+ } catch (Throwable t) {
+ LOG.warn("failed to compile pattern {}", pattern, t);
+ LOG.warn("Ignoring invalid pattern in configuration {}:
{}", CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN, pattern);
+ }
+ }
+ LOG.info("{}={}", CONSUMER_PREPROCESS_ENTITY_IGNORE_PATTERN,
entitiesToIgnore);
+ }
+
if (patternHiveTablesToIgnore != null) {
for (String pattern : patternHiveTablesToIgnore) {
try {
@@ -1073,6 +1105,36 @@ public class NotificationHookConsumer implements
Service, ActiveStateChangeHandl
}
}
+ private void preprocessEntities(PreprocessorContext context) {
+ GenericEntityPreprocessor genericEntityPreprocessor = new
GenericEntityPreprocessor(this.entityTypesToIgnore, this.entitiesToIgnore);
+
+ List<AtlasEntity> entities = context.getEntities();
+
+ if (entities != null) {
+ for (int i = 0; i < entities.size(); i++) {
+ AtlasEntity entity = entities.get(i);
+ genericEntityPreprocessor.preprocess(entity, context);
+
+ if (context.isIgnoredEntity(entity.getGuid())) {
+ entities.remove(i--);
+ }
+ }
+ }
+
+ Map<String, AtlasEntity> referredEntities =
context.getReferredEntities();
+
+ if (referredEntities != null) {
+ for (Iterator<Map.Entry<String, AtlasEntity>> iterator =
referredEntities.entrySet().iterator(); iterator.hasNext(); ) {
+ AtlasEntity entity = iterator.next().getValue();
+ genericEntityPreprocessor.preprocess(entity, context);
+
+ if (context.isIgnoredEntity(entity.getGuid())) {
+ iterator.remove();
+ }
+ }
+ }
+ }
+
private PreprocessorContext
preProcessNotificationMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) {
PreprocessorContext context = null;
@@ -1081,6 +1143,10 @@ public class NotificationHookConsumer implements
Service, ActiveStateChangeHandl
hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore,
hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs,
rdbmsTypesRemoveOwnedRefAttrs,
s3V2DirectoryPruneObjectPrefix, updateHiveProcessNameWithQualifiedName,
entityCorrelationManager);
+ if (CollectionUtils.isNotEmpty(this.entityTypesToIgnore) ||
CollectionUtils.isNotEmpty(this.entitiesToIgnore)) {
+ preprocessEntities(context);
+ }
+
if (context.isHivePreprocessEnabled()) {
preprocessHiveTypes(context);
}
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
new file mode 100644
index 000000000..195cc2945
--- /dev/null
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessor.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class GenericEntityPreprocessor extends EntityPreprocessor {
+ private static final Logger LOG =
LoggerFactory.getLogger(GenericEntityPreprocessor.class);
+ private final List<Pattern> entitiesToIgnore;
+ private final List<Pattern> entityTypesToIgnore;
+ public GenericEntityPreprocessor(List<Pattern> entityTypesToIgnore,
List<Pattern> entitiesToIgnore) {
+ super("Generic");
+ this.entityTypesToIgnore = entityTypesToIgnore;
+ this.entitiesToIgnore = entitiesToIgnore;
+ }
+
+ private boolean isMatch(String property, List<Pattern> patterns) {
+ return patterns.stream().anyMatch((Pattern pattern) ->
pattern.matcher(property).matches());
+ }
+
+ private boolean isToBeIgnored(AtlasEntity entity) {
+ String qualifiedName = getQualifiedName(entity);
+ boolean decision = false;
+
+ if (CollectionUtils.isEmpty(this.entityTypesToIgnore)) { // Will
Ignore all entities whose qualified name matches the ignore pattern.
+ decision = isMatch(qualifiedName, this.entitiesToIgnore);
+ } else if (CollectionUtils.isEmpty(this.entitiesToIgnore)) { // Will
Ignore all entities whose type matches the regex given.
+ decision = isMatch(entity.getTypeName(), this.entityTypesToIgnore);
+ } else { // Combination of above 2 cases.
+ decision = isMatch(entity.getTypeName(), this.entityTypesToIgnore)
&& isMatch(qualifiedName, this.entitiesToIgnore);
+ }
+
+ return decision;
+ }
+ @Override
+ public void preprocess(AtlasEntity entity, PreprocessorContext context) {
+ if (entity != null && isToBeIgnored(entity)) {
+ context.addToIgnoredEntities(entity);
+ }
+ }
+}
diff --git
a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
new file mode 100644
index 000000000..f777d27d7
--- /dev/null
+++
b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/GenericEntityPreprocessorTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.atlas.kafka.KafkaNotification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.hook.HookMessageDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import static
org.apache.atlas.notification.preprocessor.EntityPreprocessor.getQualifiedName;
+
+
+
+public class GenericEntityPreprocessorTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(GenericEntityPreprocessorTest.class);
+ private final HookMessageDeserializer deserializer = new
HookMessageDeserializer();
+
+ private PreprocessorContext getPreprocessorContext(String msgJson) {
+ HookNotification hookNotification = deserializer.deserialize(msgJson);
+
+ AtlasKafkaMessage<HookNotification> kafkaMsg = new
AtlasKafkaMessage<>(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC,
-1);
+
+ PreprocessorContext context = new PreprocessorContext(kafkaMsg, null,
Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(),
Collections.emptyList(),
+ Collections.emptyList(), Collections.emptyList(), false,
false, true,
+ false, null);
+
+ return context;
+ }
+
+ private boolean isMatch(List<Pattern> patterns, String property) {
+ for (Pattern p : patterns) {
+ if (p.matcher(property).matches()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void testEntityTypesToIgnore(String msgJson, List<Pattern>
entityTypesToIgnore) {
+ PreprocessorContext context = getPreprocessorContext(msgJson);
+ List<AtlasEntity> entities = context.getEntities();
+
+ Set<String> filteredEntitiesActual = filterEntity(entities,
(AtlasEntity entity) -> isMatch(entityTypesToIgnore, entity.getTypeName()));
+
+ if (context.getReferredEntities() != null) {
+ filteredEntitiesActual.addAll(filterEntity(new
ArrayList<>(context.getReferredEntities().values()), (AtlasEntity entity) ->
isMatch(entityTypesToIgnore, entity.getTypeName())));
+ }
+
+ GenericEntityPreprocessor entityPreprocessor = new
GenericEntityPreprocessor(entityTypesToIgnore, Collections.emptyList());
+ preprocessEntities(entityPreprocessor, context);
+
+ Assert.assertEquals(filteredEntitiesActual,
context.getIgnoredEntities());
+ }
+
+ private void preprocessEntities(GenericEntityPreprocessor
genericEntityPreprocessor, PreprocessorContext context) {
+ List<AtlasEntity> entities = context.getEntities();
+
+ if (entities != null) {
+ for (int i = 0; i < entities.size(); i++) {
+ AtlasEntity entity = entities.get(i);
+ genericEntityPreprocessor.preprocess(entity, context);
+
+ if (context.isIgnoredEntity(entity.getGuid())) {
+ entities.remove(i--);
+ }
+ }
+ }
+
+ Map<String, AtlasEntity> referredEntities =
context.getReferredEntities();
+
+ if (referredEntities != null) {
+ for (Iterator<Map.Entry<String, AtlasEntity>> iterator =
referredEntities.entrySet().iterator(); iterator.hasNext(); ) {
+ AtlasEntity entity = iterator.next().getValue();
+ genericEntityPreprocessor.preprocess(entity, context);
+
+ if (context.isIgnoredEntity(entity.getGuid())) {
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+ public void testEntitiesToIgnoreByQName(String msgJson, List<Pattern>
entitiesToIgnore) {
+ PreprocessorContext context = getPreprocessorContext(msgJson);
+ List<AtlasEntity> entities = context.getEntities();
+
+ Set<String> filteredEntitiesActual = filterEntity(entities,
(AtlasEntity entity) -> isMatch(entitiesToIgnore, getQualifiedName(entity)));
+ if (context.getReferredEntities() != null) {
+ filteredEntitiesActual.addAll(filterEntity(new
ArrayList<>(context.getReferredEntities().values()), (AtlasEntity entity) ->
isMatch(entitiesToIgnore, getQualifiedName(entity))));
+ }
+
+ GenericEntityPreprocessor entityPreprocessor = new
GenericEntityPreprocessor(Collections.emptyList(), entitiesToIgnore);
+ preprocessEntities(entityPreprocessor, context);
+
+
+ Assert.assertEquals(filteredEntitiesActual,
context.getIgnoredEntities());
+ }
+
+ private Set<String> filterEntity(List<AtlasEntity> entities,
Predicate<AtlasEntity> predicate) {
+ Set<String> filteredEntitiesActual = new HashSet<>();
+
+ if (entities != null) {
+ for (AtlasEntity entity: entities) {
+ if (predicate.test(entity)) {
+ filteredEntitiesActual.add(entity.getGuid());
+ }
+ }
+ }
+
+ return filteredEntitiesActual;
+ }
+
+ public void testEntitiesToIgnoreByAndTypeQName(String msgJson,
List<Pattern> entityTypesToIgnore, List<Pattern> entitiesToIgnore) {
+ PreprocessorContext context = getPreprocessorContext(msgJson);
+ List<AtlasEntity> entities = context.getEntities();
+
+ Set<String> filteredEntitiesActual = filterEntity(entities,
(AtlasEntity entity) ->
+ isMatch(entityTypesToIgnore, entity.getTypeName()) &&
isMatch(entitiesToIgnore, getQualifiedName(entity)));
+
+ if (context.getReferredEntities() != null) {
+ filteredEntitiesActual.addAll(filterEntity(new
ArrayList<>(context.getReferredEntities().values()), (AtlasEntity entity) ->
+ isMatch(entityTypesToIgnore, entity.getTypeName()) &&
isMatch(entitiesToIgnore, getQualifiedName(entity))));
+ }
+
+
+ GenericEntityPreprocessor entityPreprocessor = new
GenericEntityPreprocessor(entityTypesToIgnore, entitiesToIgnore);
+ preprocessEntities(entityPreprocessor, context);
+
+
+ Assert.assertEquals(filteredEntitiesActual,
context.getIgnoredEntities());
+ }
+}