This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 2369e7c8e ATLAS-4881: minor improvements in notification processing
2369e7c8e is described below
commit 2369e7c8e0ad63a6dc173713823cdca9bf3b9b35
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Tue Jun 18 00:18:19 2024 -0700
ATLAS-4881: minor improvements in notification processing
---
.../org/apache/atlas/utils/AtlasPerfMetrics.java | 9 ++++-
.../notification/NotificationHookConsumer.java | 41 ++++++++++++++++------
.../preprocessor/HiveDbDDLPreprocessor.java | 5 ++-
.../preprocessor/HivePreprocessor.java | 4 ++-
.../preprocessor/HiveTableDDLPreprocessor.java | 5 ++-
.../preprocessor/PreprocessorContext.java | 10 +++---
6 files changed, 56 insertions(+), 18 deletions(-)
diff --git a/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
b/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
index c72b2c3e2..a65dc8606 100644
--- a/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
+++ b/common/src/main/java/org/apache/atlas/utils/AtlasPerfMetrics.java
@@ -25,6 +25,7 @@ import java.util.Set;
public class AtlasPerfMetrics {
private final Map<String, Metric> metrics = new LinkedHashMap<>();
+ private long startTimeMs = -1;
public MetricRecorder getMetricRecorder(String name) {
@@ -36,6 +37,10 @@ public class AtlasPerfMetrics {
final String name = recorder.name;
final long timeTaken = recorder.getElapsedTime();
+ if (startTimeMs == -1) {
+ startTimeMs = System.currentTimeMillis();
+ }
+
Metric metric = metrics.get(name);
if (metric == null) {
@@ -51,6 +56,8 @@ public class AtlasPerfMetrics {
public void clear() {
metrics.clear();
+
+ startTimeMs = -1;
}
public boolean isEmpty() {
@@ -76,7 +83,7 @@ public class AtlasPerfMetrics {
sb.append("\"").append(metric.getName()).append("\":{\"count\":").append(metric.getInvocations()).append(",\"timeTaken\":").append(metric.getTotalTimeMSecs()).append("},");
}
- sb.setLength(sb.length() - 1); // remove last ","
+ sb.append("\"totalTime\":").append(System.currentTimeMillis() -
startTimeMs);
}
sb.append("}");
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 7b02ac449..86535275b 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -84,6 +84,7 @@ import
org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
+import javax.ws.rs.core.Response;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
@@ -932,18 +933,34 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
RequestContext.get().resetEntityGuidUpdates();
exceptionClassName = e.getClass().getSimpleName();
- if (numRetries == (maxRetries - 1)) {
- String strMessage =
AbstractNotification.getMessageJson(message);
+ // don't retry in following conditions:
+ // 1. number of retry attempts reached configured
count
+ // 2. notification processing failed due to invalid
data (non-existing type, entity, ..)
+ boolean maxRetriesReached = numRetries ==
(maxRetries - 1);
+ AtlasErrorCode errorCode = (e instanceof
AtlasBaseException) ? ((AtlasBaseException) e).getAtlasErrorCode() : null;
+ boolean unrecoverableFailure = errorCode !=
null && (Response.Status.NOT_FOUND.equals(errorCode.getHttpCode()) ||
Response.Status.BAD_REQUEST.equals(errorCode.getHttpCode()));
- LOG.warn("Max retries exceeded for message {}",
strMessage, e);
+ if (maxRetriesReached || unrecoverableFailure) {
+ try {
+ String strMessage =
AbstractNotification.getMessageJson(message);
+
+ if (unrecoverableFailure) {
+ LOG.warn("Unrecoverable failure while
processing message {}", strMessage, e);
+ } else {
+ LOG.warn("Max retries exceeded for message
{}", strMessage, e);
+ }
- stats.isFailedMsg = true;
+ stats.isFailedMsg = true;
- failedMessages.add(strMessage);
+ failedMessages.add(strMessage);
- if (failedMessages.size() >= failedMsgCacheSize) {
- recordFailedMessages();
+ if (failedMessages.size() >=
failedMsgCacheSize) {
+ recordFailedMessages();
+ }
+ } catch (Throwable t) {
+ LOG.warn("error while recording failed
message: type={}, topic={}, partition={}, offset={}", message.getType(),
kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
}
+
return;
} else if (e instanceof
org.apache.atlas.repository.graphdb.AtlasSchemaViolationException) {
LOG.warn("{}: Continuing: {}", exceptionClassName,
e.getMessage());
@@ -978,10 +995,14 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
metricsUtil.onNotificationProcessingComplete(kafkaMsg.getTopic(),
kafkaMsg.getPartition(), kafkaMsg.getOffset(), stats);
if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs)
{
- String strMessage =
AbstractNotification.getMessageJson(message);
+ try {
+ String strMessage =
AbstractNotification.getMessageJson(message);
- LOG.warn("msgProcessingTime={}, msgSize={},
topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
-
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}",
stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
+ LOG.warn("msgProcessingTime={}, msgSize={},
topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
+
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}",
stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
+ } catch (Throwable t) {
+ LOG.warn("error while recording large message:
msgProcessingTime={}, type={}, topic={}, partition={}, offset={}",
stats.timeTakenMs, message.getType(), kafkaMsg.getTopic(),
kafkaMsg.getPartition(), kafkaMsg.getOffset(), t);
+ }
}
if (auditLog != null) {
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
index dcff0939d..a59590ca4 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
@@ -47,6 +47,9 @@ public class HiveDbDDLPreprocessor extends EntityPreprocessor
{
}
setObjectIdWithGuid(dbObject, guid);
- LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(),
qualifiedName, guid);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Preprocessor: Updated: {} -> {}", getTypeName(),
qualifiedName, guid);
+ }
}
}
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index 083e343b0..d2aab310e 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -173,7 +173,9 @@ public class HivePreprocessor {
Object qualifiedName =
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
if (!Objects.equals(name, qualifiedName)) {
- LOG.info("setting {}.name={}. topic-offset={},
partition={}", entity.getTypeName(), qualifiedName,
context.getKafkaMessageOffset(), context.getKafkaPartition());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("setting {}.name={}. topic-offset={},
partition={}", entity.getTypeName(), qualifiedName,
context.getKafkaMessageOffset(), context.getKafkaPartition());
+ }
entity.setAttribute(ATTRIBUTE_NAME, qualifiedName);
}
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
index 83d4d7c1a..383543457 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
@@ -47,6 +47,9 @@ public class HiveTableDDLPreprocessor extends
EntityPreprocessor {
}
setObjectIdWithGuid(tableObject, guid);
- LOG.info("{}: Preprocessor: Updated: {} -> {}", getTypeName(),
qualifiedName, guid);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Preprocessor: Updated: {} -> {}", getTypeName(),
qualifiedName, guid);
+ }
}
}
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index f930d9f35..d25c32bfb 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -365,10 +365,12 @@ public class PreprocessorContext {
}
}
- if (firstEntity != null) {
- LOG.info("moved {} referred-entities to end of entities-list
(firstEntity:typeName={}, qualifiedName={}). topic-offset={}, partition={}",
referredEntitiesToMove.size(), firstEntity.getTypeName(),
EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(),
kafkaMessage.getPartition());
- } else {
- LOG.info("moved {} referred-entities to entities-list.
topic-offset={}, partition={}", referredEntitiesToMove.size(),
kafkaMessage.getOffset(), kafkaMessage.getPartition());
+ if (LOG.isDebugEnabled()) {
+ if (firstEntity != null) {
+ LOG.debug("moved {} referred-entities to end of
entities-list (firstEntity:typeName={}, qualifiedName={}). topic-offset={},
partition={}", referredEntitiesToMove.size(), firstEntity.getTypeName(),
EntityPreprocessor.getQualifiedName(firstEntity), kafkaMessage.getOffset(),
kafkaMessage.getPartition());
+ } else {
+ LOG.debug("moved {} referred-entities to entities-list.
topic-offset={}, partition={}", referredEntitiesToMove.size(),
kafkaMessage.getOffset(), kafkaMessage.getPartition());
+ }
}
referredEntitiesToMove.clear();