This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 978087a88 ATLAS-4878: utility to analyze hook notifications
978087a88 is described below
commit 978087a882348f1fc1b6002a0aeb29192d8cc00a
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Sun Jun 9 18:36:56 2024 -0700
ATLAS-4878: utility to analyze hook notifications
---
distro/pom.xml | 1 +
.../src/main/assemblies/notification-analyzer.xml | 71 +++++
pom.xml | 1 +
tools/notification-analyzer/README | 119 ++++++++
tools/notification-analyzer/pom.xml | 192 ++++++++++++
.../scripts/notification-analyzer.sh | 28 ++
.../apache/atlas/tools/NotificationAnalyzer.java | 322 +++++++++++++++++++++
.../main/resources/atlas-application.properties | 18 ++
.../src/main/resources/atlas-log4j.xml | 44 +++
9 files changed, 796 insertions(+)
diff --git a/distro/pom.xml b/distro/pom.xml
index 874b944f2..66f710896 100644
--- a/distro/pom.xml
+++ b/distro/pom.xml
@@ -136,6 +136,7 @@ atlas.graph.storage.hbase.regions-per-server=1
<descriptor>src/main/assemblies/atlas-repair-index-package.xml</descriptor>
<!--<descriptor>src/main/assemblies/migration-exporter.xml</descriptor>-->
<descriptor>src/main/assemblies/classification-updater.xml</descriptor>
+
<descriptor>src/main/assemblies/notification-analyzer.xml</descriptor>
</descriptors>
<finalName>apache-atlas-${project.version}</finalName>
<tarLongFileMode>gnu</tarLongFileMode>
diff --git a/distro/src/main/assemblies/notification-analyzer.xml
b/distro/src/main/assemblies/notification-analyzer.xml
new file mode 100644
index 000000000..63d9f2490
--- /dev/null
+++ b/distro/src/main/assemblies/notification-analyzer.xml
@@ -0,0 +1,71 @@
+<!--
+**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*
+*
+-->
+<assembly>
+ <id>notification-analyzer</id>
+ <formats>
+ <format>zip</format>
+ </formats>
+
+ <baseDirectory>notification-analyzer</baseDirectory>
+
+ <fileSets>
+ <fileSet>
+ <includes>
+ <include>README*</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+
<directory>../tools/notification-analyzer/target/dependency</directory>
+ <outputDirectory>.</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>../tools/notification-analyzer/scripts</directory>
+ <outputDirectory>.</outputDirectory>
+ <includes>
+ <include>*.sh</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+
<directory>../tools/notification-analyzer/src/main/resources</directory>
+ <outputDirectory>.</outputDirectory>
+ <includes>
+ <include>atlas-log4j.xml</include>
+ <include>atlas-application.properties</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>../tools/notification-analyzer</directory>
+ <outputDirectory>.</outputDirectory>
+ <includes>
+ <include>README</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>../tools/notification-analyzer/target</directory>
+ <outputDirectory>.</outputDirectory>
+ <includes>
+
<include>atlas-notification-analyzer-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/pom.xml b/pom.xml
index 2f9aedbd2..bfb1094ff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -846,6 +846,7 @@
<module>addons/kafka-bridge</module>
<module>tools/classification-updater</module>
<module>tools/atlas-index-repair</module>
+ <module>tools/notification-analyzer</module>
<module>addons/impala-hook-api</module>
<module>addons/impala-bridge-shim</module>
<module>addons/impala-bridge</module>
diff --git a/tools/notification-analyzer/README
b/tools/notification-analyzer/README
new file mode 100644
index 000000000..7631e06d9
--- /dev/null
+++ b/tools/notification-analyzer/README
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+Introduction
+ This utility analyzes hook notification messages stored in JSON format
+ in a file, and reports following details:
+ - number of notifications per notification type
+ - number of entities created/updated
+ - number of entity references in notifications per entity type
+ - number of entity operations performed
+ - number of entity operations performed per entity type
+
+Setup
+ - All libraries necessary to run the utility are packaged in following file:
+ distro/target/apache-atlas-<version>-notification-analyzer.zip
+
+ - Unzip the file in the directory where the tool needs to be installed.
+
+ - Update log configurations in atlas-log4j.xml
+
+
+Running the utility
+ - Execute following command to start the utility:
+ ./notification-analyzer.sh -m message_file.json [-o output_file]
+
+ - Progress will be printed in the following format in the output file (if
specified) or on stdout:
+ PROGRESS #1: analyzed 1000 notifications, 1071 messages
+ PROGRESS #2: analyzed 2000 notifications, 2131 messages
+ PROGRESS #3: analyzed 3000 notifications, 3194 messages
+ ...
+ Completed analyzing 114755 notification, 120816 messages. Time taken: 234
seconds
+
+ - Note that the number of notifications might be less than the number
+ of messages in the file in case some notifications were split into
+ multiple messages due to their size.
+
+ - Logs will be printed in file /tmp/atlas-notification-analyzer.log. The
location of
+ the log file can be configured using following environment variables:
+ LOGFILE_DIR
+ LOGFILE_NAME
+
+Sample Result:
+ The utility will print the result of analysis in the following format:
+ {
+ "notifications": 114755,
+ "notificationLengthAvg": 74331,
+ "notificationLengthMax": 101148684,
+ "splitNotifications": 453,
+ "splitNotificationLengthAvg": 13901446,
+ "splitNotificationLengthMax": 101148684,
+ "entities": 598435,
+ "notificationEntities": 2575347,
+ "notificationByType": {
+ "ENTITY_CREATE_V2": 49428,
+ "ENTITY_FULL_UPDATE_V2": 1597,
+ "ENTITY_PARTIAL_UPDATE_V2": 36561,
+ "ENTITY_DELETE_V2": 27169
+ },
+ "notificationEntityByType": {
+ "hdfs_path": 16417,
+ "hive_db": 20471,
+ "hive_table": 57143,
+ "hive_storagedesc": 30018,
+ "hive_column": 685384,
+ "hive_process": 41512
+ "hive_column_lineage": 1724402,
+ },
+ "entityOperations": {
+ "CREATE": 598435,
+ "UPDATE": 1913182
+ "PARTIAL_UPDATE": 36561,
+ "DELETE": 27169
+ },
+ "entityOperationsByType": {
+ "CREATE": {
+ "hdfs_path": 10940,
+ "hive_db": 224,
+ "hive_table": 22154,
+ "hive_storagedesc": 15280,
+ "hive_column": 332332,
+ "hive_process": 23462,
+ "hive_column_lineage": 194043
+ },
+ "UPDATE" {
+ "hdfs_path": 5477,
+ "hive_column": 319559,
+ "hive_column_lineage": 1530359,
+ "hive_db": 20203,
+ "hive_process": 18050,
+ "hive_storagedesc": 13204,
+ "hive_table": 6330
+ },
+ "PARTIAL_UPDATE": {
+ "hive_column": 33493,
+ "hive_storagedesc": 1534,
+ "hive_table": 1534
+ },
+ "DELETE": {
+ "hive_db": 44,
+ "hive_table": 27125
+ }
+ }
+ }
+
diff --git a/tools/notification-analyzer/pom.xml
b/tools/notification-analyzer/pom.xml
new file mode 100644
index 000000000..2f772a76f
--- /dev/null
+++ b/tools/notification-analyzer/pom.xml
@@ -0,0 +1,192 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>apache-atlas</artifactId>
+ <groupId>org.apache.atlas</groupId>
+ <version>3.0.0-SNAPSHOT</version>
+ <relativePath>../../</relativePath>
+ </parent>
+ <artifactId>atlas-notification-analyzer</artifactId>
+ <description>Apache Atlas Notification Analyzer</description>
+ <name>Apache Atlas Notification Analyzer</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-notification</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>${commons-cli.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>${commons-collections.version}</version>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>dist</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-binaries</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+
<outputDirectory>${project.build.directory}/dependency</outputDirectory>
+
<overWriteReleases>false</overWriteReleases>
+
<overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <artifactItems>
+ <artifactItem>
+
<groupId>${project.groupId}</groupId>
+
<artifactId>${project.artifactId}</artifactId>
+
<version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>${project.groupId}</groupId>
+ <artifactId>atlas-intg</artifactId>
+
<version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>${project.groupId}</groupId>
+
<artifactId>atlas-notification</artifactId>
+
<version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>commons-cli</groupId>
+
<artifactId>commons-cli</artifactId>
+
<version>${commons-cli.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>commons-codec</groupId>
+
<artifactId>commons-codec</artifactId>
+
<version>${commons-codec.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>commons-collections</groupId>
+
<artifactId>commons-collections</artifactId>
+
<version>${commons-collections.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+
<version>${commons-io.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>org.apache.commons</groupId>
+
<artifactId>commons-lang3</artifactId>
+ </artifactItem>
+ <artifactItem>
+
<groupId>org.apache.commons</groupId>
+
<artifactId>commons-compress</artifactId>
+ <version>1.26.2</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>commons-configuration</groupId>
+
<artifactId>commons-configuration</artifactId>
+
<version>${commons-conf.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>commons-lang</groupId>
+
<artifactId>commons-lang</artifactId>
+
<version>${commons-lang.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>commons-logging</groupId>
+
<artifactId>commons-logging</artifactId>
+
<version>${commons-logging.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>com.fasterxml.jackson.core</groupId>
+
<artifactId>jackson-annotations</artifactId>
+
<version>${jackson.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>com.fasterxml.jackson.core</groupId>
+
<artifactId>jackson-core</artifactId>
+
<version>${jackson.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>com.fasterxml.jackson.core</groupId>
+
<artifactId>jackson-databind</artifactId>
+
<version>${jackson.databind.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+
<version>${log4j2.version}</version>
+ </artifactItem>
+ <artifactItem>
+
<groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+
<version>${log4j2.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.slf4j</groupId>
+
<artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/tools/notification-analyzer/scripts/notification-analyzer.sh
b/tools/notification-analyzer/scripts/notification-analyzer.sh
new file mode 100644
index 000000000..179336bd0
--- /dev/null
+++ b/tools/notification-analyzer/scripts/notification-analyzer.sh
@@ -0,0 +1,28 @@
+#!/bin/bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License. See accompanying LICENSE file.
+#
+# resolve links - $0 may be a softlink
+
+M2_REPO=~/.m2/repository
+
+LIB_DIR=$(pwd)
+LOGFILE_DIR="${LOGFILE_DIR:-/tmp/}"
+LOGFILE_NAME="${LOGFILE_NAME:-atlas-notification-analyzer.log}"
+
+CP=.
+for i in "${LIB_DIR}/"*.jar; do
+ CP="${CP}:$i"
+done
+
+java -cp ${CP} -Dlog4j.configuration=atlas-log4j.xml
-Datlas.log.dir=${LOGFILE_DIR} -Datlas.log.file=${LOGFILE_NAME}
org.apache.atlas.tools.NotificationAnalyzer $*
diff --git
a/tools/notification-analyzer/src/main/java/org/apache/atlas/tools/NotificationAnalyzer.java
b/tools/notification-analyzer/src/main/java/org/apache/atlas/tools/NotificationAnalyzer.java
new file mode 100644
index 000000000..20b106546
--- /dev/null
+++
b/tools/notification-analyzer/src/main/java/org/apache/atlas/tools/NotificationAnalyzer.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.tools;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.notification.AtlasNotificationMessageDeserializer;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
+import org.apache.atlas.utils.AtlasJson;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+import
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
+import
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest;
+import
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
+import
org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
+import
org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
+import
org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2;
+import
org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NotificationAnalyzer {
+ private static final Logger LOG =
LoggerFactory.getLogger(NotificationAnalyzer.class);
+
+ private final String msgFile;
+ private final String outputFile;
+ private final AtlasNotificationMessageDeserializer deserializer;
+ private final Map<String, AtomicInteger>
notificationCountByType = new HashMap<>();
+ private final AtomicInteger entityCount
= new AtomicInteger();
+ private final Map<String, AtomicInteger> entityCountByType
= new HashMap<>();
+ private final Map<String, AtomicInteger> entityOperCount
= new HashMap<>();
+ private final Map<String, Map<String, AtomicInteger>>
entityOperByTypeCount = new HashMap<>();
+ private final Set<String> knownEntities
= new HashSet<>();
+ private final IntSummaryStatistics notificationStats
= new IntSummaryStatistics();
+ private final IntSummaryStatistics
splitNotificationStats = new IntSummaryStatistics();
+
+ public static void main(String[] args) {
+ CommandLineParser parser = new BasicParser();
+ Options options = new Options();
+
+ options.addOption("m", "message-file", true, "Messages file");
+ options.addOption("o", "output-file", true, "Output file");
+
+ try {
+ CommandLine cmd = parser.parse(options, args);
+ String msgFile = cmd.getOptionValue("m");
+ String outFile = cmd.getOptionValue("o");
+
+ if (msgFile == null || msgFile.isEmpty()) {
+ msgFile = "ATLAS_HOOK.json";
+ }
+
+ NotificationAnalyzer analyzer = new NotificationAnalyzer(msgFile,
outFile, NotificationType.HOOK);
+
+ analyzer.analyze();
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+
+ public NotificationAnalyzer(String msgFile, String outputFile,
NotificationType notificationType) {
+ this.msgFile = msgFile;
+ this.outputFile = outputFile;
+ this.deserializer = notificationType.getDeserializer();
+ }
+
+ public void analyze() throws Exception {
+ long startTimeMs = System.currentTimeMillis();
+
+ try (BufferedReader reader = getInputReader(); PrintWriter writer =
getOutputWriter()) {
+ int msgCount = 0;
+ int notificationSize = 0;
+
+ for (String msg = reader.readLine(); msg != null; msg =
reader.readLine()) {
+ msgCount++;
+ notificationSize += msg.length();
+
+ HookNotification notification = (HookNotification)
deserializer.deserialize(msg);
+
+ if (notification == null) { // split notification, continue
+ continue;
+ }
+
+ notificationStats.accept(notificationSize);
+
+ if (notificationSize > msg.length()) {
+ splitNotificationStats.accept(notificationSize);
+ }
+
+ notificationSize = 0;
+
+
notificationCountByType.computeIfAbsent(notification.getType().name(), e -> new
AtomicInteger()).incrementAndGet();
+
+ switch (notification.getType()) {
+ case ENTITY_CREATE:
+ handleEntityCreate((EntityCreateRequest) notification);
+ break;
+ case ENTITY_PARTIAL_UPDATE:
+ handleEntityPartialUpdate((EntityPartialUpdateRequest)
notification);
+ break;
+ case ENTITY_FULL_UPDATE:
+ handleEntityUpdate((EntityUpdateRequest) notification);
+ break;
+ case ENTITY_DELETE:
+ handleEntityDelete((EntityDeleteRequest) notification);
+ break;
+ case ENTITY_CREATE_V2:
+ handleEntityCreateV2((EntityCreateRequestV2)
notification);
+ break;
+ case ENTITY_PARTIAL_UPDATE_V2:
+
handleEntityPartialUpdateV2((EntityPartialUpdateRequestV2) notification);
+ break;
+ case ENTITY_FULL_UPDATE_V2:
+ handleEntityUpdateV2((EntityUpdateRequestV2)
notification);
+ break;
+ case ENTITY_DELETE_V2:
+ handleEntityDeleteV2((EntityDeleteRequestV2)
notification);
+ break;
+ }
+
+ if ((notificationStats.getCount() % 1000) == 0) {
+ LOG.info("PROGRESS #{}: analyzed {} notifications, {}
messages", (msgCount / 1000), notificationStats.getCount(), msgCount);
+ writer.printf("PROGRESS #%1$d: analyzed %2$d
notifications, %3$d messages%n", (msgCount / 1000),
notificationStats.getCount(), msgCount);
+ writer.flush();
+ }
+ }
+
+ long timeTakenSeconds = (System.currentTimeMillis() - startTimeMs)
/ 1000;
+
+ LOG.info("Completed analyzing {}. Time taken: {} seconds",
msgFile, timeTakenSeconds);
+ writer.printf("Completed analyzing %1$s. Time taken: %2$d
seconds%n", msgFile, timeTakenSeconds);
+ writer.flush();
+
+ Map<String, Object> results = new LinkedHashMap<>();
+
+ results.put("messages", msgCount);
+ results.put("notifications", notificationStats.getCount());
+ results.put("notificationLengthAvg", (int)
notificationStats.getAverage());
+ results.put("notificationLengthMax", notificationStats.getMax());
+ results.put("splitNotifications",
splitNotificationStats.getCount());
+ results.put("splitNotificationLengthAvg", (int)
splitNotificationStats.getAverage());
+ results.put("splitNotificationLengthMax",
splitNotificationStats.getMax());
+ results.put("entities", entityCount);
+ results.put("notificationEntities",
entityCountByType.values().stream().mapToInt(AtomicInteger::get).sum());
+ results.put("notificationByType", notificationCountByType);
+ results.put("notificationEntityByType", entityCountByType);
+ results.put("entityOperations", entityOperCount);
+ results.put("entityOperationsByType", entityOperByTypeCount);
+
+ String msg = AtlasJson.toJson(results);
+
+ LOG.info(msg);
+ writer.println(msg);
+ writer.flush();
+ }
+ }
+
+ private void handleEntityCreate(EntityCreateRequest request) {
+ if (request.getEntities() != null) {
+ for (Referenceable entity : request.getEntities()) {
+ recordEntity(entity);
+ }
+ }
+ }
+
+ private void handleEntityUpdate(EntityUpdateRequest request) {
+ if (request.getEntities() != null) {
+ for (Referenceable entity : request.getEntities()) {
+ recordEntity(entity);
+ }
+ }
+ }
+
+ private void handleEntityPartialUpdate(EntityPartialUpdateRequest request)
{
+ recordEntityOperation(request.getTypeName(), "PARTIAL_UPDATE");
+ }
+
+ private void handleEntityDelete(EntityDeleteRequest request) {
+ knownEntities.remove(getEntityKey(request));
+
+ recordEntityOperation(request.getTypeName(), "DELETE");
+ }
+
+ private void handleEntityCreateV2(EntityCreateRequestV2 request) {
+ if (request.getEntities() != null) {
+ if (request.getEntities().getEntities() != null) {
+ for (AtlasEntity entity : request.getEntities().getEntities())
{
+ recordEntity(entity);
+ }
+ }
+
+ if (request.getEntities().getReferredEntities() != null) {
+ for (AtlasEntity entity :
request.getEntities().getReferredEntities().values()) {
+ recordEntity(entity);
+ }
+ }
+ }
+ }
+
+ private void handleEntityUpdateV2(EntityUpdateRequestV2 request) {
+ if (request.getEntities() != null) {
+ if (request.getEntities().getEntities() != null) {
+ for (AtlasEntity entity : request.getEntities().getEntities())
{
+ recordEntity(entity);
+ }
+ }
+
+ if (request.getEntities().getReferredEntities() != null) {
+ for (AtlasEntity entity :
request.getEntities().getReferredEntities().values()) {
+ recordEntity(entity);
+ }
+ }
+ }
+ }
+
+ private void handleEntityPartialUpdateV2(EntityPartialUpdateRequestV2
request) {
+ if (request.getEntity() != null && request.getEntity().getEntity() !=
null) {
+ AtlasEntity entity = request.getEntity().getEntity();
+
+ recordEntityOperation(entity.getTypeName(), "PARTIAL_UPDATE");
+ }
+ }
+
+ private void handleEntityDeleteV2(EntityDeleteRequestV2 request) {
+ if (request.getEntities() != null) {
+ for (AtlasObjectId objId : request.getEntities()) {
+ knownEntities.remove(getEntityKey(objId));
+
+ recordEntityOperation(objId.getTypeName(), "DELETE");
+ }
+ }
+ }
+
+ private void recordEntity(AtlasEntity entity) {
+ final String operation;
+
+ if (knownEntities.add(getEntityKey(entity))) {
+ operation = "CREATE";
+ } else {
+ operation = "UPDATE";
+ }
+
+ recordEntityOperation(entity.getTypeName(), operation);
+ }
+
+ private void recordEntity(Referenceable entity) {
+ final String operation;
+
+ if (knownEntities.add(getEntityKey(entity))) {
+ operation = "CREATE";
+ } else {
+ operation = "UPDATE";
+ }
+
+ recordEntityOperation(entity.getTypeName(), operation);
+ }
+
+ private void recordEntityOperation(String entityTypeName, String
operation) {
+ if (operation.equals("CREATE")) {
+ entityCount.incrementAndGet();
+ }
+
+ entityCountByType.computeIfAbsent(entityTypeName, c -> new
AtomicInteger()).incrementAndGet();
+ entityOperCount.computeIfAbsent(operation, c -> new
AtomicInteger()).incrementAndGet();
+ entityOperByTypeCount.computeIfAbsent(operation, c -> new
TreeMap<>()).computeIfAbsent(entityTypeName, c -> new
AtomicInteger()).incrementAndGet();
+ }
+
+ private String getEntityKey(AtlasEntity entity) {
+ return entity.getTypeName() + ":" +
getUniqueKey(entity.getAttributes());
+ }
+
+ private String getEntityKey(AtlasObjectId objectId) {
+ return objectId.getTypeName() + ":" +
getUniqueKey(objectId.getUniqueAttributes());
+ }
+
+ private String getEntityKey(Referenceable entity) {
+ return entity.getTypeName() + ":" + getUniqueKey(entity.getValues());
+ }
+
+ private String getEntityKey(EntityDeleteRequest request) {
+ return request.getTypeName() + ":" + request.getAttributeValue();
+ }
+
+ private Object getUniqueKey(Map<String, Object> attributes) {
+ Object ret = attributes.get("qualifiedName");
+
+ return ret == null ? attributes.get("name") : ret;
+ }
+
+ private BufferedReader getInputReader() throws IOException {
+ return new BufferedReader(new FileReader(msgFile));
+ }
+
+ private PrintWriter getOutputWriter() throws IOException {
+ return (outputFile == null || outputFile.isEmpty()) ? new
PrintWriter(System.out) : new PrintWriter(new FileWriter(outputFile, true));
+ }
+}
diff --git
a/tools/notification-analyzer/src/main/resources/atlas-application.properties
b/tools/notification-analyzer/src/main/resources/atlas-application.properties
new file mode 100644
index 000000000..01d705720
--- /dev/null
+++
b/tools/notification-analyzer/src/main/resources/atlas-application.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
diff --git a/tools/notification-analyzer/src/main/resources/atlas-log4j.xml
b/tools/notification-analyzer/src/main/resources/atlas-log4j.xml
new file mode 100755
index 000000000..0f9182f36
--- /dev/null
+++ b/tools/notification-analyzer/src/main/resources/atlas-log4j.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m
(%C{1}:%L)%n"/>
+ </layout>
+ </appender>
+
+ <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="${atlas.log.dir}/${atlas.log.file}"/>
+ <param name="Append" value="true"/>
+ <param name="maxFileSize" value="100MB" />
+ <param name="maxBackupIndex" value="20" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m
(%C{1}:%L)%n"/>
+ </layout>
+ </appender>
+
+ <root>
+ <priority value="info"/>
+ <appender-ref ref="FILE"/>
+ </root>
+</log4j:configuration>