This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 4c49d3933 ATLAS-5021: Extract Metadata from Trino periodically (#428)
4c49d3933 is described below
commit 4c49d393382a655d179703b97758aabcffb0d5bf
Author: Pinal Shah <[email protected]>
AuthorDate: Tue Sep 9 12:15:17 2025 +0530
ATLAS-5021: Extract Metadata from Trino periodically (#428)
---
addons/models/6000-Trino/6000-trino_model.json | 185 +++++++++
addons/trino-extractor/pom.xml | 96 +++++
.../src/main/bin/run-trino-extractor.sh | 137 +++++++
.../main/conf/atlas-trino-extractor-logback.xml | 56 +++
.../src/main/conf/atlas-trino-extractor.properties | 43 ++
.../apache/atlas/trino/cli/ExtractorContext.java | 123 ++++++
.../apache/atlas/trino/cli/ExtractorService.java | 349 +++++++++++++++++
.../org/apache/atlas/trino/cli/TrinoExtractor.java | 229 +++++++++++
.../atlas/trino/client/AtlasClientHelper.java | 435 +++++++++++++++++++++
.../atlas/trino/client/TrinoClientHelper.java | 151 +++++++
.../trino/connector/AtlasEntityConnector.java | 31 ++
.../atlas/trino/connector/ConnectorFactory.java | 42 ++
.../atlas/trino/connector/HiveEntityConnector.java | 118 ++++++
.../trino/connector/IcebergEntityConnector.java | 118 ++++++
.../java/org/apache/atlas/trino/model/Catalog.java | 85 ++++
.../apache/atlas/trino/cli/TrinoExtractorIT.java | 40 ++
distro/pom.xml | 1 +
.../src/main/assemblies/atlas-trino-extractor.xml | 65 +++
docs/src/documents/Tools/TrinoExtractor.md | 149 +++++++
.../graphdb/janus/AtlasJanusGraphManagement.java | 2 +-
pom.xml | 1 +
.../repository/graph/GraphBackedSearchIndexer.java | 1 -
.../repository/patches/UniqueAttributePatch.java | 1 -
23 files changed, 2455 insertions(+), 3 deletions(-)
diff --git a/addons/models/6000-Trino/6000-trino_model.json
b/addons/models/6000-Trino/6000-trino_model.json
new file mode 100644
index 000000000..734038530
--- /dev/null
+++ b/addons/models/6000-Trino/6000-trino_model.json
@@ -0,0 +1,185 @@
+{
+ "entityDefs": [
+ {
+ "name": "trino_instance",
+ "superTypes": [ "DataSet" ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ { "name": "hostname", "typeName": "string", "isOptional": true,
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
+ { "name": "port", "typeName": "int", "isOptional": true,
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false }
+ ]
+ },
+ {
+ "name": "trino_catalog",
+ "superTypes": [ "Asset" ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ { "name": "connectorType", "typeName": "string", "cardinality":
"SINGLE", "isUnique": false, "isIndexable": false },
+ { "name": "connectionUrl", "typeName": "string", "cardinality":
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true }
+ ]
+ },
+ {
+ "name": "trino_schema",
+ "superTypes": [ "Asset" ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ { "name": "parameters", "typeName": "map<string,string>",
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional":
true }
+ ]
+ },
+ {
+ "name": "trino_table",
+ "superTypes": [ "DataSet" ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ { "name": "comment", "typeName": "string",
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional":
true },
+ { "name": "parameters", "typeName": "map<string,string>",
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional":
true },
+ { "name": "table_type", "typeName": "string",
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional":
true }
+ ]
+ },
+ {
+ "name": "trino_column",
+ "superTypes": [ "DataSet" ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ { "name": "data_type", "typeName": "string", "cardinality":
"SINGLE", "isUnique": false, "isIndexable": true, "isOptional": true },
+ { "name": "ordinal_position", "typeName": "int", "cardinality":
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true },
+ { "name": "column_default", "typeName": "string", "cardinality":
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true },
+ { "name": "comment", "typeName": "string", "cardinality":
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true },
+ { "name": "is_nullable", "typeName": "boolean", "cardinality":
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true },
+ { "name": "isPrimaryKey", "typeName": "boolean", "cardinality":
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true }
+ ]
+ },
+ {
+ "name": "trino_column_lineage",
+ "superTypes": [ "Process" ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ { "name": "expression", "typeName": "string", "cardinality": "SINGLE",
"isUnique": false, "isIndexable": false, "isOptional": true }
+ ]
+ },
+ {
+ "name": "trino_process",
+ "superTypes": [ "Process" ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ { "name": "queryId", "typeName": "string", "cardinality":
"SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 },
+ { "name": "queryCreateTime", "typeName": "long", "cardinality":
"SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 },
+ { "name": "queryEndTime", "typeName": "long", "cardinality":
"SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 },
+ { "name": "queryText", "typeName": "string", "cardinality":
"SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 }
+ ]
+ },
+ {
+ "name": "trino_schema_ddl",
+ "superTypes": [ "ddl" ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [ ]
+ },
+ {
+ "name": "trino_table_ddl",
+ "superTypes": [ "ddl" ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [ ]
+ }
+ ],
+ "relationshipDefs": [
+ {
+ "name": "trino_instance_catalog",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "COMPOSITION",
+ "propagateTags": "NONE",
+ "endDef1": { "type": "trino_catalog", "name": "instance",
"isContainer": false, "cardinality": "SINGLE", "isLegacyAttribute": false },
+ "endDef2": { "type": "trino_instance", "name": "catalogs",
"isContainer": true, "cardinality": "SET" }
+ },
+ {
+ "name": "trino_schema_catalog",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "AGGREGATION",
+ "propagateTags": "NONE",
+ "endDef1": { "type": "trino_schema", "name": "catalog", "isContainer":
false, "cardinality": "SINGLE", "isLegacyAttribute": false },
+ "endDef2": { "type": "trino_catalog", "name": "schemas", "isContainer":
true, "cardinality": "SET" }
+ },
+ {
+ "name": "trino_table_schema",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "AGGREGATION",
+ "propagateTags": "NONE",
+ "endDef1": { "type": "trino_table", "name": "trinoschema",
"isContainer": false, "cardinality": "SINGLE", "isLegacyAttribute": false },
+ "endDef2": { "type": "trino_schema", "name": "tables",
"isContainer": true, "cardinality": "SET" }
+ },
+ {
+ "name": "trino_table_columns",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "COMPOSITION",
+ "propagateTags": "NONE",
+ "endDef1": { "type": "trino_table", "name": "columns", "isContainer":
true, "cardinality": "SET", "isLegacyAttribute": false },
+ "endDef2": { "type": "trino_column", "name": "table", "isContainer":
false, "cardinality": "SINGLE", "isLegacyAttribute": false }
+ },
+ {
+ "name": "trino_table_ddl_queries",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "COMPOSITION",
+ "propagateTags": "NONE",
+ "endDef1": { "type": "trino_table", "name": "ddlQueries",
"isContainer": true, "cardinality": "SET" },
+ "endDef2": { "type": "trino_table_ddl", "name": "table",
"isContainer": false, "cardinality": "SINGLE" }
+ },
+ {
+ "name": "trino_schema_ddl_queries",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "COMPOSITION",
+ "propagateTags": "NONE",
+ "endDef1": { "type": "trino_schema", "name": "ddlQueries",
"isContainer": true, "cardinality": "SET" },
+ "endDef2": { "type": "trino_schema_ddl", "name": "db",
"isContainer": false, "cardinality": "SINGLE" }
+ },
+ {
+ "name": "trino_process_column_lineage",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "COMPOSITION",
+ "propagateTags": "NONE",
+ "endDef1": { "type": "trino_column_lineage", "name": "query",
"isContainer": false, "cardinality": "SINGLE" },
+ "endDef2": { "type": "trino_process", "name": "columnLineages",
"isContainer": true, "cardinality": "SET" }
+ },
+ {
+ "name": "trino_schema_hive_db",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "ASSOCIATION",
+ "propagateTags": "BOTH",
+ "endDef1": { "type": "hive_db", "name": "trino_schema",
"cardinality": "SET" },
+ "endDef2": { "type": "trino_schema", "name": "hive_db",
"cardinality": "SINGLE" }
+ },
+ {
+ "name": "trino_table_hive_table",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "ASSOCIATION",
+ "propagateTags": "BOTH",
+ "endDef1": { "type": "hive_table", "name": "trino_table",
"cardinality": "SET" },
+ "endDef2": { "type": "trino_table", "name": "hive_table",
"cardinality": "SINGLE" }
+ },
+ {
+ "name": "trino_column_hive_column",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "ASSOCIATION",
+ "propagateTags": "BOTH",
+ "endDef1": { "type": "hive_column", "name": "trino_column",
"cardinality": "SET" },
+ "endDef2": { "type": "trino_column", "name": "hive_column",
"cardinality": "SINGLE" }
+ }
+ ]
+}
diff --git a/addons/trino-extractor/pom.xml b/addons/trino-extractor/pom.xml
new file mode 100644
index 000000000..a7a66b937
--- /dev/null
+++ b/addons/trino-extractor/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>apache-atlas</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <relativePath>../../</relativePath>
+ </parent>
+
+ <artifactId>atlas-trino-extractor</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Atlas Trino Bridge</name>
+ <description>Apache Atlas Trino Bridge Module</description>
+
+ <properties>
+ <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
+ <checkstyle.skip>false</checkstyle.skip>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ <version>1.9</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.trino</groupId>
+ <artifactId>trino-jdbc</artifactId>
+ <version>403</version>
+ <!-- java8 supported version -->
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-client-v2</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-intg</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ <version>2.3.2</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <excludeScope>test</excludeScope>
+ <includeScope>compile</includeScope>
+
<outputDirectory>${project.build.directory}/dependency/trino</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/addons/trino-extractor/src/main/bin/run-trino-extractor.sh
b/addons/trino-extractor/src/main/bin/run-trino-extractor.sh
new file mode 100755
index 000000000..3763e6e39
--- /dev/null
+++ b/addons/trino-extractor/src/main/bin/run-trino-extractor.sh
@@ -0,0 +1,137 @@
+#!/bin/bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License. See accompanying LICENSE file.
+#
+# resolve links - $0 may be a softlink
+PRG="${0}"
+
+[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true
+
+while [ -h "${PRG}" ]; do
+ ls=`ls -ld "${PRG}"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "${PRG}"`/"$link"
+ fi
+done
+
+BASEDIR=`dirname ${PRG}`
+BASEDIR=`cd ${BASEDIR}/..;pwd`
+
+if test -z "${JAVA_HOME}"
+then
+ JAVA_BIN=`which java`
+ JAR_BIN=`which jar`
+else
+ JAVA_BIN="${JAVA_HOME}/bin/java"
+ JAR_BIN="${JAVA_HOME}/bin/jar"
+fi
+export JAVA_BIN
+
+if [ ! -e "${JAVA_BIN}" ] || [ ! -e "${JAR_BIN}" ]; then
+ echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure
java and jar commands are available."
+ exit 1
+fi
+
+for i in "${BASEDIR}/lib/"*.jar; do
+ ATLASCPPATH="${ATLASCPPATH}:$i"
+done
+
+if [ -z "${ATLAS_CONF_DIR}" ] && [ -e "${BASEDIR}/conf/" ];then
+ ATLAS_CONF_DIR="${BASEDIR}/conf/"
+fi
+ATLASCPPATH=${ATLASCPPATH}:${ATLAS_CONF_DIR}
+
+# log dir for applications
+ATLAS_LOG_DIR="${BASEDIR}/log"
+export ATLAS_LOG_DIR
+LOGFILE="$ATLAS_LOG_DIR/atlas-trino-extractor.log"
+
+TIME=`date +%Y%m%d%H%M%s`
+
+CP="${ATLASCPPATH}"
+
+# If running in cygwin, convert pathnames and classpath to Windows format.
+if [ "${CYGWIN}" == "true" ]
+then
+ ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}`
+ LOGFILE=`cygpath -w ${LOGFILE}`
+ HIVE_CP=`cygpath -w ${HIVE_CP}`
+ HADOOP_CP=`cygpath -w ${HADOOP_CP}`
+ CP=`cygpath -w -p ${CP}`
+fi
+
+JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR
-Datlas.log.file=atlas-trino-extractor.log
+-Dlogback.configurationFile=atlas-trino-extractor-logback.xml
-Datlas.properties=atlas-trino-extractor.properties
-Djdk.httpclient.HttpClient.log=requests"
+
+IMPORT_ARGS=()
+JVM_ARGS=
+
+set -f
+while true
+do
+ option=${1}
+ shift
+
+ case "${option}" in
+ -c) IMPORT_ARGS+=("-c" "$1"); shift;;
+ -s) IMPORT_ARGS+=("-s" "$1"); shift;;
+ -t) IMPORT_ARGS+=("-t" "$1"); shift;;
+ -cx)
+ CRON_EXPR="$1"
+ shift
+ while [[ "$1" != "" && "$1" != -* ]]; do
+ CRON_EXPR="$CRON_EXPR $1"
+ shift
+ done
+ IMPORT_ARGS+=("-cx" "$CRON_EXPR");;
+ -h) export HELP_OPTION="true"; IMPORT_ARGS+=("-h");;
+ --catalog) IMPORT_ARGS+=("--catalog" "$1"); shift;;
+ --table) IMPORT_ARGS+=("--table" "$1"); shift;;
+ --schema) IMPORT_ARGS+=("--schema" "$1"); shift;;
+ --cronExpression)
+ CRON_EXPR="$1"
+ shift
+ while [[ "$1" != "" && "$1" != -* ]]; do
+ CRON_EXPR="$CRON_EXPR $1"
+ shift
+ done
+ IMPORT_ARGS+=("--cronExpression" "$CRON_EXPR");;
+ --help) export HELP_OPTION="true"; IMPORT_ARGS+=("--help");;
+ -*)
+ echo "Invalid argument found"
+ export HELP_OPTION="true"; IMPORT_ARGS+=("--help")
+ break;;
+ "") break;;
+ esac
+done
+
+JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}"
+
+if [ -z ${HELP_OPTION} ]; then
+ echo "Log file for import is $LOGFILE"
+fi
+
+"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}"
org.apache.atlas.trino.cli.TrinoExtractor "${IMPORT_ARGS[@]}"
+
+set +f
+
+RETVAL=$?
+if [ -z ${HELP_OPTION} ]; then
+ [ $RETVAL -eq 0 ] && echo Trino Meta Data imported successfully!
+ [ $RETVAL -eq 1 ] && echo Failed to import Trino Meta Data! Check logs at:
$LOGFILE for details.
+fi
+
+exit $RETVAL
diff --git
a/addons/trino-extractor/src/main/conf/atlas-trino-extractor-logback.xml
b/addons/trino-extractor/src/main/conf/atlas-trino-extractor-logback.xml
new file mode 100644
index 000000000..4cb6e4259
--- /dev/null
+++ b/addons/trino-extractor/src/main/conf/atlas-trino-extractor-logback.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <encoder>
+ <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ </appender>
+
+ <appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${atlas.log.dir}/${atlas.log.file}</file>
+ <append>true</append>
+ <encoder>
+ <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+ </encoder>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+
<fileNamePattern>${atlas.log.dir}/${atlas.log.file}-%d</fileNamePattern>
+ <maxHistory>20</maxHistory>
+ <cleanHistoryOnStart>true</cleanHistoryOnStart>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.apache.atlas" additivity="false" level="info">
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <!-- to avoid logs - The configuration log.flush.interval.messages = 1 was
supplied but isn't a known config -->
+ <logger name="org.apache.kafka.common.config.AbstractConfig"
additivity="false" level="error">
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <root level="warn">
+ <appender-ref ref="FILE"/>
+ </root>
+</configuration>
diff --git
a/addons/trino-extractor/src/main/conf/atlas-trino-extractor.properties
b/addons/trino-extractor/src/main/conf/atlas-trino-extractor.properties
new file mode 100755
index 000000000..96d79bb33
--- /dev/null
+++ b/addons/trino-extractor/src/main/conf/atlas-trino-extractor.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######## Atlas connection ############
+atlas.rest.address=http://localhost:21000/
+
+######## Trino jdbc url, If SSL is enabled, append below value with
";SSL=true" ##########
+atlas.trino.jdbc.address=jdbc:trino://<host>:<port>/
+
+######## Trino Server Authentication ############
+atlas.trino.jdbc.user=<username>
+
+######## Trino environment name - default value "cm" ######
+#atlas.trino.namespace=cm
+
+######## Catalogs for which extractor is to run ######
+#atlas.trino.catalogs.registered=
+
+######## Datasource for which Atlas hook is enabled########
+#atlas.trino.catalog.hook.enabled.hive_catalog=true
+#atlas.trino.catalog.hook.enabled.hive_catalog.namespace=cm
+
+######## To run extractor for specific catalog, schema or table ########
+#atlas.trino.extractor.catalog=
+#atlas.trino.extractor.schema=
+#atlas.trino.extractor.table=
+
+######## Cron Expression to run the extractor (Below is example for every
hour) ########
+#atlas.trino.extractor.schedule=0 0 * * * ?
\ No newline at end of file
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java
new file mode 100644
index 000000000..dee04300c
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+
+public class ExtractorContext {
+ static final String TRINO_NAMESPACE_CONF = "atlas.trino.namespace";
+ static final String DEFAULT_TRINO_NAMESPACE = "cm";
+ static final String TRINO_CATALOG_CONF =
"atlas.trino.extractor.catalog";
+ static final String TRINO_SCHEMA_CONF =
"atlas.trino.extractor.schema";
+ static final String TRINO_TABLE_CONF =
"atlas.trino.extractor.table";
+ static final String TRINO_SCHEDULE_CONF =
"atlas.trino.extractor.schedule";
+ static final String OPTION_CATALOG_SHORT = "c";
+ static final String OPTION_CATALOG_LONG = "catalog";
+ static final String OPTION_SCHEMA_SHORT = "s";
+ static final String OPTION_SCHEMA_LONG = "schema";
+ static final String OPTION_TABLE_SHORT = "t";
+ static final String OPTION_TABLE_LONG = "table";
+ static final String OPTION_CRON_EXPRESSION_SHORT = "cx";
+ static final String OPTION_CRON_EXPRESSION_LONG = "cronExpression";
+ static final String OPTION_HELP_SHORT = "h";
+ static final String OPTION_HELP_LONG = "help";
+
+ private final Configuration atlasConf;
+ private final String namespace;
+ private final String catalog;
+ private final String schema;
+ private final String table;
+ private final AtlasClientHelper atlasClientHelper;
+ private final TrinoClientHelper trinoClientHelper;
+ private final String cronExpression;
+
+ public ExtractorContext(CommandLine cmd) throws AtlasException,
IOException {
+ this.atlasConf = getAtlasProperties();
+ this.atlasClientHelper = createAtlasClientHelper();
+ this.trinoClientHelper = createTrinoClientHelper();
+ this.namespace = atlasConf.getString(TRINO_NAMESPACE_CONF,
DEFAULT_TRINO_NAMESPACE);
+
+ String cmdCatalog = cmd.getOptionValue(OPTION_CATALOG_SHORT);
+ String cmdSchema = cmd.getOptionValue(OPTION_SCHEMA_SHORT);
+ String cmdTable = cmd.getOptionValue(OPTION_TABLE_SHORT);
+ String cmdSchedule =
cmd.getOptionValue(OPTION_CRON_EXPRESSION_SHORT);
+ this.cronExpression = StringUtils.isNotEmpty(cmdSchedule) ?
cmdSchedule : atlasConf.getString(TRINO_SCHEDULE_CONF);
+
+ if (StringUtils.isEmpty(cmdCatalog)) {
+ this.catalog = atlasConf.getString(TRINO_CATALOG_CONF);
+ this.schema = atlasConf.getString(TRINO_SCHEMA_CONF);
+ this.table = atlasConf.getString(TRINO_TABLE_CONF);
+ } else {
+ this.catalog = cmdCatalog;
+ this.schema = cmdSchema;
+ this.table = cmdTable;
+ }
+ }
+
+ public Configuration getAtlasConf() {
+ return atlasConf;
+ }
+
+ public AtlasClientHelper getAtlasConnector() {
+ return atlasClientHelper;
+ }
+
+ public TrinoClientHelper getTrinoConnector() {
+ return trinoClientHelper;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ public String getCatalog() {
+ return catalog;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public String getCronExpression() {
+ return cronExpression;
+ }
+
+ private Configuration getAtlasProperties() throws AtlasException {
+ return ApplicationProperties.get();
+ }
+
+ private TrinoClientHelper createTrinoClientHelper() {
+ return new TrinoClientHelper(atlasConf);
+ }
+
+ private AtlasClientHelper createAtlasClientHelper() throws IOException {
+ return new AtlasClientHelper(atlasConf);
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java
new file mode 100644
index 000000000..695f6b3c1
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ExtractorService {
+ public static final int THREAD_POOL_SIZE = 5;
+ public static final int CATALOG_EXECUTION_TIMEOUT = 60;
+ public static final String TRINO_NAME_ATTRIBUTE = "name";
+ private static final Logger LOG =
LoggerFactory.getLogger(ExtractorService.class);
+ private static final String TRINO_CATALOG_REGISTERED =
"atlas.trino.catalogs.registered";
+ private static final String TRINO_CATALOG_HOOK_ENABLED_PREFIX =
"atlas.trino.catalog.hook.enabled.";
+ private static final String TRINO_CATALOG_HOOK_ENABLED_SUFFIX =
".namespace";
+
+ public boolean execute(ExtractorContext context) throws Exception {
+ Map<String, String> catalogs =
context.getTrinoConnector().getAllTrinoCatalogs();
+
+ LOG.info("Found {} catalogs in Trino: {}", catalogs.size(),
catalogs.keySet());
+
+ try {
+ processCatalogs(context, catalogs);
+
+ deleteCatalogs(context, catalogs);
+ } catch (AtlasServiceException e) {
+ throw new AtlasServiceException(e);
+ }
+
+ return true;
+ }
+
+ private void processCatalogs(ExtractorContext context, Map<String, String>
catalogInTrino) throws AtlasServiceException {
+ if (MapUtils.isEmpty(catalogInTrino)) {
+ LOG.debug("No catalogs found under Trino");
+ } else {
+ List<Catalog> catalogsToProcess = new ArrayList<>();
+
+ if (StringUtils.isEmpty(context.getCatalog())) {
+ String[] registeredCatalogs =
context.getAtlasConf().getStringArray(TRINO_CATALOG_REGISTERED);
+
+ if (registeredCatalogs != null) {
+ for (String registeredCatalog : registeredCatalogs) {
+ if (catalogInTrino.containsKey(registeredCatalog)) {
+ catalogsToProcess.add(getCatalogInstance(context,
registeredCatalog, catalogInTrino.get(registeredCatalog)));
+ }
+ }
+ }
+ } else {
+ if (catalogInTrino.containsKey(context.getCatalog())) {
+ Catalog catalog = getCatalogInstance(context,
context.getCatalog(), catalogInTrino.get(context.getCatalog()));
+
+ catalog.setSchemaToImport(context.getSchema());
+ catalog.setTableToImport(context.getTable());
+
+ catalogsToProcess.add(catalog);
+ }
+ }
+
+ if (CollectionUtils.isEmpty(catalogsToProcess)) {
+ LOG.info("No catalogs found to extract");
+ } else {
+ LOG.info("{} catalogs to be extracted",
catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList()));
+
+ AtlasEntityWithExtInfo trinoInstanceEntity =
context.getAtlasConnector().createOrUpdateInstanceEntity(context.getNamespace());
+ ExecutorService catalogExecutor =
Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(),
THREAD_POOL_SIZE));
+
+ try {
+ for (Catalog currentCatalog : catalogsToProcess) {
+ catalogExecutor.submit(() -> {
+ try {
+
currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity);
+ processCatalog(context, currentCatalog);
+ } catch (Exception e) {
+ LOG.error("Error processing catalog: {}",
currentCatalog, e);
+ }
+ });
+ }
+ } finally {
+ catalogExecutor.shutdown();
+ }
+
+ try {
+ if
(!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT,
TimeUnit.MINUTES)) {
+ LOG.warn("Catalog processing did not complete within
{} minutes", CATALOG_EXECUTION_TIMEOUT);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ LOG.error("Catalog processing was interrupted", e);
+ }
+
+ LOG.info("Catalogs scan completed");
+ }
+ }
+ }
+
+ private void processCatalog(ExtractorContext context, Catalog catalog)
throws AtlasServiceException, SQLException {
+ if (catalog != null) {
+ LOG.info("Started extracting catalog: {}", catalog.getName());
+
+ String catalogName = catalog.getName();
+ AtlasEntityWithExtInfo trinoCatalogEntity =
context.getAtlasConnector().createOrUpdateCatalogEntity(catalog);
+ List<String> schemas =
context.getTrinoConnector().getTrinoSchemas(catalogName,
catalog.getSchemaToImport());
+
+ LOG.info("Found {} schemas in catalog {}", schemas.size(),
catalogName);
+
+ processSchemas(context, catalog, trinoCatalogEntity.getEntity(),
schemas);
+
+ if (StringUtils.isEmpty(context.getSchema())) {
+ deleteSchemas(context, schemas,
trinoCatalogEntity.getEntity().getGuid());
+ }
+ }
+ }
+
+ private void processSchemas(ExtractorContext context, Catalog catalog,
AtlasEntity trinoCatalogEntity, List<String> schemaToImport) {
+ for (String schemaName : schemaToImport) {
+ LOG.info("Started extracting schema: {}", schemaName);
+
+ try {
+ AtlasEntityWithExtInfo schemaEntity =
context.getAtlasConnector().createOrUpdateSchemaEntity(catalog,
trinoCatalogEntity, schemaName);
+ Map<String, Map<String, Object>> tables =
context.getTrinoConnector().getTrinoTables(catalog.getName(), schemaName,
catalog.getTableToImport());
+
+ LOG.info("Found {} tables in schema {}.{}", tables.size(),
catalog.getName(), schemaName);
+
+ processTables(context, catalog, schemaName,
schemaEntity.getEntity(), tables);
+
+ if (StringUtils.isEmpty(context.getTable())) {
+ deleteTables(context, new ArrayList<>(tables.keySet()),
schemaEntity.getEntity().getGuid());
+ }
+ } catch (Exception e) {
+ LOG.error("Error processing schema: {}", schemaName, e);
+ }
+ }
+ }
+
+ private void processTables(ExtractorContext context, Catalog catalog,
String schemaName, AtlasEntity schemaEntity, Map<String, Map<String, Object>>
trinoTables) {
+ for (String trinoTableName : trinoTables.keySet()) {
+ LOG.info("Started extracting table: {}", trinoTableName);
+
+ try {
+ Map<String, Map<String, Object>> trinoColumns =
context.getTrinoConnector().getTrinoColumns(catalog.getName(), schemaName,
trinoTableName);
+
+ LOG.info("Found {} columns in table {}.{}.{}",
trinoColumns.size(), catalog.getName(), schemaName, trinoTableName);
+
+ context.getAtlasConnector().createOrUpdateTableEntity(catalog,
schemaName, trinoTableName, trinoTables.get(trinoTableName), trinoColumns,
schemaEntity);
+ } catch (Exception e) {
+ LOG.error("Error processing table: {}", trinoTableName, e);
+ }
+ }
+ }
+
+ private void deleteCatalogs(ExtractorContext context, Map<String, String>
catalogInTrino) throws AtlasServiceException {
+ if (StringUtils.isEmpty(context.getCatalog())) {
+ AtlasEntityHeader trinoInstance =
context.getAtlasConnector().getTrinoInstance(context.getNamespace());
+
+ if (trinoInstance != null) {
+ Set<String> catalogsToDelete = getCatalogsToDelete(context,
catalogInTrino, trinoInstance.getGuid());
+
+ if (CollectionUtils.isNotEmpty(catalogsToDelete)) {
+ LOG.info("Atlas has {} catalogs in instance {} that are no
more present in Trino, starting to delete", catalogsToDelete,
trinoInstance.getGuid());
+
+ for (String catalogGuid : catalogsToDelete) {
+ try {
+ deleteSchemas(context, Collections.emptyList(),
catalogGuid);
+
+ LOG.info("Deleting catalog: {}", catalogGuid);
+
+
context.getAtlasConnector().deleteByGuid(Collections.singleton(catalogGuid));
+ } catch (AtlasServiceException e) {
+ LOG.error("Error deleting catalog: {}",
catalogGuid, e);
+ }
+ }
+ } else {
+ LOG.info("Atlas has no catalogs to delete");
+ }
+ }
+
+ LOG.info("Catalogs scanned for deletion completed");
+ }
+ }
+
+ private void deleteSchemas(ExtractorContext context, List<String>
schemasInTrino, String catalogGuid) {
+ try {
+ Set<String> schemasToDelete = getSchemasToDelete(context,
schemasInTrino, catalogGuid);
+
+ if (CollectionUtils.isNotEmpty(schemasToDelete)) {
+ LOG.info("Atlas has {} schemas in catalog {} that are no more
present in Trino, starting to delete", schemasToDelete.size(), catalogGuid);
+
+ for (String schemaGuid : schemasToDelete) {
+ try {
+ deleteTables(context, Collections.emptyList(),
schemaGuid);
+
+ LOG.info("Deleting schema: {}", schemaGuid);
+
+
context.getAtlasConnector().deleteByGuid(Collections.singleton(schemaGuid));
+ } catch (AtlasServiceException e) {
+ LOG.error("Error in deleting schema: {}", schemaGuid,
e);
+ }
+ }
+ } else {
+ LOG.info("Atlas has no schemas to delete in catalog {}",
catalogGuid);
+ }
+ } catch (AtlasServiceException e) {
+ LOG.error("Error deleting schemas in catalog {}", catalogGuid, e);
+ }
+ }
+
+ private void deleteTables(ExtractorContext context, List<String>
tablesInTrino, String schemaGuid) {
+ try {
+ Set<String> tablesToDelete = getTablesToDelete(context,
tablesInTrino, schemaGuid);
+
+ if (CollectionUtils.isNotEmpty(tablesToDelete)) {
+ LOG.info("Atlas has {} tables in schema {} that are no more
present in Trino, starting to delete", tablesToDelete.size(), schemaGuid);
+
+ for (String tableGuid : tablesToDelete) {
+ try {
+ LOG.info("Deleting table: {}", tableGuid);
+
+
context.getAtlasConnector().deleteByGuid(Collections.singleton(tableGuid));
+ } catch (AtlasServiceException e) {
+ LOG.error("Error deleting table: {}", tableGuid, e);
+ }
+ }
+ } else {
+ LOG.info("Atlas has no tables to delete in schema {}",
schemaGuid);
+ }
+ } catch (AtlasServiceException e) {
+ LOG.error("Error deleting tables in schema: {}", schemaGuid, e);
+ }
+ }
+
+ private static Catalog getCatalogInstance(ExtractorContext context, String
catalogName, String connectorType) {
+ Catalog ret = null;
+
+ if (catalogName != null) {
+ boolean isHookEnabled =
context.getAtlasConf().getBoolean(TRINO_CATALOG_HOOK_ENABLED_PREFIX +
catalogName, false);
+ String hookNamespace = null;
+
+ if (isHookEnabled) {
+ String hookNamespaceProperty =
TRINO_CATALOG_HOOK_ENABLED_PREFIX + catalogName +
TRINO_CATALOG_HOOK_ENABLED_SUFFIX;
+
+ hookNamespace =
context.getAtlasConf().getString(hookNamespaceProperty);
+
+ if (hookNamespace == null) {
+ LOG.warn("Atlas Hook is enabled for {}, but '{}' found
empty", catalogName, hookNamespaceProperty);
+ }
+ }
+
+ ret = new Catalog(catalogName, connectorType, isHookEnabled,
hookNamespace, context.getNamespace());
+ }
+
+ return ret;
+ }
+
+ private static Set<String> getCatalogsToDelete(ExtractorContext context,
Map<String, String> catalogInTrino, String instanceGuid) throws
AtlasServiceException {
+ Set<String> ret = Collections.emptySet();
+
+ if (instanceGuid != null) {
+ List<AtlasEntityHeader> catalogsInAtlas =
context.getAtlasConnector().getAllCatalogsInInstance(instanceGuid);
+
+ if (CollectionUtils.isNotEmpty(catalogsInAtlas)) {
+ Map<String, String> finalCatalogInTrino = catalogInTrino ==
null ? Collections.emptyMap() : catalogInTrino;
+
+ ret = catalogsInAtlas.stream()
+ .filter(entity ->
entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null)
+ .filter(entity ->
!finalCatalogInTrino.containsKey(entity.getAttribute(TRINO_NAME_ATTRIBUTE)))
+ .map(AtlasEntityHeader::getGuid)
+ .collect(Collectors.toSet());
+ }
+ }
+
+ return ret;
+ }
+
+ private static Set<String> getSchemasToDelete(ExtractorContext context,
List<String> schemasInTrino, String catalogGuid) throws AtlasServiceException {
+ Set<String> ret = Collections.emptySet();
+
+ if (catalogGuid != null) {
+ List<AtlasEntityHeader> schemasInAtlas =
context.getAtlasConnector().getAllSchemasInCatalog(catalogGuid);
+
+ if (CollectionUtils.isNotEmpty(schemasInAtlas)) {
+ List<String> finalSchemasInTrino = schemasInTrino == null ?
Collections.emptyList() : schemasInTrino;
+
+ ret = schemasInAtlas.stream()
+ .filter(entity ->
entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null)
+ .filter(entity ->
!finalSchemasInTrino.contains(entity.getAttribute(TRINO_NAME_ATTRIBUTE)))
+ .map(AtlasEntityHeader::getGuid)
+ .collect(Collectors.toSet());
+ }
+ }
+
+ return ret;
+ }
+
+ private static Set<String> getTablesToDelete(ExtractorContext context,
List<String> tablesInTrino, String schemaGuid) throws AtlasServiceException {
+ Set<String> ret = Collections.emptySet();
+
+ if (schemaGuid != null) {
+ List<AtlasEntityHeader> tablesInAtlas =
context.getAtlasConnector().getAllTablesInSchema(schemaGuid);
+
+ if (CollectionUtils.isNotEmpty(tablesInAtlas)) {
+ List<String> finalTablesInTrino = tablesInTrino == null ?
Collections.emptyList() : tablesInTrino;
+
+ ret = tablesInAtlas.stream()
+ .filter(entity ->
entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null)
+ .filter(entity ->
!finalTablesInTrino.contains(entity.getAttribute(TRINO_NAME_ATTRIBUTE)))
+ .map(AtlasEntityHeader::getGuid)
+ .collect(Collectors.toSet());
+ }
+ }
+
+ return ret;
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java
new file mode 100644
index 000000000..7db16e7a6
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.quartz.CronExpression;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CATALOG_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CATALOG_SHORT;
+import static
org.apache.atlas.trino.cli.ExtractorContext.OPTION_CRON_EXPRESSION_LONG;
+import static
org.apache.atlas.trino.cli.ExtractorContext.OPTION_CRON_EXPRESSION_SHORT;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_HELP_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_HELP_SHORT;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_SCHEMA_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_SCHEMA_SHORT;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_TABLE_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_TABLE_SHORT;
+
+public class TrinoExtractor {
+ private static final Logger LOG =
LoggerFactory.getLogger(TrinoExtractor.class);
+
+ private static final int EXIT_CODE_SUCCESS = 0;
+ private static final int EXIT_CODE_FAILED = 1;
+ private static final int EXIT_CODE_HELP = 2;
+
+ private static TrinoExtractor instance;
+
+ private final ExtractorContext extractorContext;
+ private int exitCode = EXIT_CODE_FAILED;
+
+ private TrinoExtractor(String[] args) throws Exception {
+ extractorContext = createExtractorContext(args);
+ }
+
+ public static void main(String[] args) {
+ try {
+ instance = new TrinoExtractor(args);
+
+ if (instance.extractorContext != null) {
+ instance.run();
+ } else {
+ LOG.error("Extractor context is null. Cannot proceed with
extraction.");
+
+ instance.exitCode = EXIT_CODE_FAILED;
+ }
+ } catch (Exception e) {
+ LOG.error("Extraction failed", e);
+
+ instance.exitCode = EXIT_CODE_FAILED;
+ }
+
+ System.exit(instance != null ? instance.exitCode : EXIT_CODE_FAILED);
+ }
+
+ private void run() {
+ try {
+ String cronExpression = extractorContext.getCronExpression();
+
+ if (StringUtils.isNotEmpty(cronExpression)) {
+ if (!CronExpression.isValidExpression(cronExpression)) {
+ LOG.error("Invalid cron expression: {}", cronExpression);
+
+ exitCode = EXIT_CODE_FAILED;
+ } else {
+ LOG.info("Scheduling extraction for cron expression: {}",
cronExpression);
+
+ JobDetail job =
JobBuilder.newJob(MetadataJob.class).withIdentity("metadataJob",
"group1").build();
+ Trigger trigger = TriggerBuilder.newTrigger()
+
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow()
+ .build();
+
+ Scheduler scheduler = null;
+
+ try {
+ scheduler = new StdSchedulerFactory().getScheduler();
+
+ scheduler.scheduleJob(job, trigger);
+ scheduler.start();
+
+ try {
+ Thread.currentThread().join();
+ } catch (InterruptedException ie) {
+ LOG.info("Main thread interrupted, shutting down
scheduler.");
+
+ exitCode = EXIT_CODE_SUCCESS;
+ }
+ } finally {
+ try {
+ if (scheduler != null && !scheduler.isShutdown()) {
+ scheduler.shutdown(true);
+ }
+ } catch (SchedulerException se) {
+ LOG.warn("Error shutting down scheduler: {}",
se.getMessage());
+ }
+ }
+ }
+ } else {
+ LOG.info("Running extraction once");
+
+ ExtractorService extractorService = new ExtractorService();
+
+ if (extractorService.execute(extractorContext)) {
+ exitCode = EXIT_CODE_SUCCESS;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error encountered. exitCode: {}", exitCode, e);
+ } finally {
+ if (extractorContext.getAtlasConnector() != null) {
+ extractorContext.getAtlasConnector().close();
+ }
+ }
+
+ System.exit(exitCode);
+ }
+
+ private ExtractorContext createExtractorContext(String[] args) throws
AtlasBaseException, IOException {
+ Options acceptedCliOptions = prepareCommandLineOptions();
+
+ try {
+ CommandLine cmd = new
BasicParser().parse(acceptedCliOptions, args, true);
+ List<String> argsNotProcessed = cmd.getArgList();
+
+ if (argsNotProcessed != null && !argsNotProcessed.isEmpty()) {
+ throw new AtlasBaseException("Unrecognized arguments.");
+ }
+
+ ExtractorContext ret = null;
+
+ if (cmd.hasOption(ExtractorContext.OPTION_HELP_SHORT)) {
+ printUsage(acceptedCliOptions);
+ exitCode = EXIT_CODE_HELP;
+ } else {
+ ret = new ExtractorContext(cmd);
+ LOG.debug("Successfully initialized the extractor context.");
+ }
+
+ return ret;
+ } catch (ParseException | AtlasBaseException e) {
+ printUsage(acceptedCliOptions);
+
+ throw new AtlasBaseException("Invalid arguments. Reason: " +
e.getMessage(), e);
+ } catch (AtlasException e) {
+ throw new AtlasBaseException("Error in getting Application
Properties. Reason: " + e.getMessage(), e);
+ } catch (IOException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static Options prepareCommandLineOptions() {
+ Options acceptedCliOptions = new Options();
+
+ return acceptedCliOptions.addOption(OPTION_CATALOG_SHORT,
OPTION_CATALOG_LONG, true, "Catalog name")
+ .addOption(OPTION_SCHEMA_SHORT, OPTION_SCHEMA_LONG, true,
"Schema name")
+ .addOption(OPTION_TABLE_SHORT, OPTION_TABLE_LONG, true, "Table
name")
+ .addOption(OPTION_CRON_EXPRESSION_SHORT,
OPTION_CRON_EXPRESSION_LONG, true, "Cron expression to run extraction")
+ .addOption(OPTION_HELP_SHORT, OPTION_HELP_LONG, false, "Print
help message");
+ }
+
+ private static void printUsage(Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(TrinoExtractor.class.getName(), options);
+ }
+
+ @DisallowConcurrentExecution
+ public static class MetadataJob implements Job {
+ private final Logger logger =
LoggerFactory.getLogger(MetadataJob.class);
+
+ public void execute(JobExecutionContext context) throws
JobExecutionException {
+ ExtractorContext extractorContext = TrinoExtractor.instance !=
null ? TrinoExtractor.instance.extractorContext : null;
+
+ if (extractorContext != null) {
+ logger.info("Executing metadata extraction at: {}",
java.time.LocalTime.now());
+
+ ExtractorService extractorService = new ExtractorService();
+
+ try {
+ if (extractorService.execute(extractorContext)) {
+ TrinoExtractor.instance.exitCode = EXIT_CODE_SUCCESS;
+ }
+ } catch (Exception e) {
+ logger.error("Error encountered: ", e);
+ throw new JobExecutionException(e);
+ }
+ logger.info("Completed executing metadata extraction at: {}",
java.time.LocalTime.now());
+ }
+ }
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java
new file mode 100644
index 000000000..9bc18c16c
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.client;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.atlas.type.AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME;
+
+public class AtlasClientHelper {
+ public static final String TRINO_INSTANCE =
"trino_instance";
+ public static final String TRINO_CATALOG =
"trino_catalog";
+ public static final String TRINO_SCHEMA =
"trino_schema";
+ public static final String TRINO_TABLE =
"trino_table";
+ public static final String TRINO_COLUMN =
"trino_column";
+ public static final String TRINO_INSTANCE_CATALOG_ATTRIBUTE = "catalogs";
+ public static final String TRINO_CATALOG_SCHEMA_ATTRIBUTE = "schemas";
+ public static final String TRINO_SCHEMA_TABLE_ATTRIBUTE = "tables";
+ public static final String QUALIFIED_NAME_ATTRIBUTE =
"qualifiedName";
+ public static final String NAME_ATTRIBUTE = "name";
+ public static final int DEFAULT_PAGE_LIMIT = 10000;
+ private static final Logger LOG =
LoggerFactory.getLogger(AtlasClientHelper.class);
+ private static final String DEFAULT_ATLAS_URL =
"http://localhost:21000/";
+ private static final String APPLICATION_PROPERTY_ATLAS_ENDPOINT =
"atlas.rest.address";
+ private static final String TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE =
"connectorType";
+ private static final String TRINO_CATALOG_INSTANCE_ATTRIBUTE =
"instance";
+ private static final String TRINO_CATALOG_INSTANCE_RELATIONSHIP =
"trino_instance_catalog";
+ private static final String TRINO_SCHEMA_CATALOG_ATTRIBUTE =
"catalog";
+ private static final String TRINO_SCHEMA_CATALOG_RELATIONSHIP =
"trino_schema_catalog";
+ private static final String TRINO_COLUMN_DATA_TYPE_ATTRIBUTE =
"data_type";
+ private static final String TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE =
"ordinal_position";
+ private static final String TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE =
"column_default";
+ private static final String TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE =
"is_nullable";
+ private static final String TRINO_COLUMN_TABLE_ATTRIBUTE =
"table";
+ private static final String TRINO_TABLE_TYPE =
"table_type";
+ private static final String TRINO_TABLE_COLUMN_RELATIONSHIP =
"trino_table_columns";
+ private static final String TRINO_TABLE_SCHEMA_RELATIONSHIP =
"trino_table_schema";
+ private static final String TRINO_TABLE_SCHEMA_ATTRIBUTE =
"trinoschema";
+ private static final String TRINO_TABLE_COLUMN_ATTRIBUTE =
"columns";
+
+ private static AtlasClientV2 atlasClientV2;
+
+ public AtlasClientHelper(Configuration atlasConf) throws IOException {
+ atlasClientV2 = getAtlasClientV2Instance(atlasConf);
+ }
+
+ public List<AtlasEntityHeader> getAllCatalogsInInstance(String
instanceGuid) throws AtlasServiceException {
+ List<AtlasEntityHeader> entities =
getAllRelationshipEntities(instanceGuid, TRINO_INSTANCE_CATALOG_ATTRIBUTE);
+
+ if (CollectionUtils.isNotEmpty(entities)) {
+ LOG.debug("Retrieved {} catalogs in Trino instance {}",
entities.size(), instanceGuid);
+ } else {
+ LOG.debug("No catalog found in Trino instance {}", instanceGuid);
+ }
+
+ return entities;
+ }
+
+ public List<AtlasEntityHeader> getAllSchemasInCatalog(String catalogGuid)
throws AtlasServiceException {
+ List<AtlasEntityHeader> entities =
getAllRelationshipEntities(catalogGuid, TRINO_CATALOG_SCHEMA_ATTRIBUTE);
+
+ if (CollectionUtils.isNotEmpty(entities)) {
+ LOG.debug("Retrieved {} schemas in Trino catalog {}",
entities.size(), catalogGuid);
+ } else {
+ LOG.debug("No schema found in Trino catalog {}", catalogGuid);
+ }
+
+ return entities;
+ }
+
+ public List<AtlasEntityHeader> getAllTablesInSchema(String schemaGuid)
throws AtlasServiceException {
+ List<AtlasEntityHeader> entities =
getAllRelationshipEntities(schemaGuid, TRINO_SCHEMA_TABLE_ATTRIBUTE);
+
+ if (CollectionUtils.isNotEmpty(entities)) {
+ LOG.debug("Retrieved {} tables in Trino schema {}",
entities.size(), schemaGuid);
+ } else {
+ LOG.debug("No table found in Trino schema {}", schemaGuid);
+ }
+
+ return entities;
+ }
+
+ public AtlasEntityHeader getTrinoInstance(String namespace) {
+ try {
+ return atlasClientV2.getEntityHeaderByAttribute(TRINO_INSTANCE,
Collections.singletonMap(QUALIFIED_NAME_ATTRIBUTE, namespace));
+ } catch (AtlasServiceException e) {
+ return null;
+ }
+ }
+
+ public AtlasEntityWithExtInfo createOrUpdateInstanceEntity(String
trinoNamespace) throws AtlasServiceException {
+ String qualifiedName = trinoNamespace;
+ AtlasEntityWithExtInfo ret = findEntity(TRINO_INSTANCE,
qualifiedName, true, true);
+
+ if (ret == null) {
+ AtlasEntity entity = new AtlasEntity(TRINO_INSTANCE);
+
+ entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+ entity.setAttribute(NAME_ATTRIBUTE, trinoNamespace);
+
+ ret = createEntity(new AtlasEntityWithExtInfo(entity));
+ }
+
+ return ret;
+ }
+
+ public AtlasEntityWithExtInfo createOrUpdateCatalogEntity(Catalog catalog)
throws AtlasServiceException {
+ String catalogName = catalog.getName();
+ String trinoNamespace = catalog.getInstanceName();
+ String qualifiedName = String.format("%s@%s", catalogName,
trinoNamespace);
+
+ AtlasEntityWithExtInfo ret = findEntity(TRINO_CATALOG, qualifiedName,
true, true);
+
+ if (ret == null) {
+ AtlasEntity entity = new AtlasEntity(TRINO_CATALOG);
+
+ entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+ entity.setAttribute(NAME_ATTRIBUTE, catalogName);
+ entity.setAttribute(TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE,
catalog.getType());
+ entity.setRelationshipAttribute(TRINO_CATALOG_INSTANCE_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(catalog.getTrinoInstanceEntity().getEntity(),
TRINO_CATALOG_INSTANCE_RELATIONSHIP));
+
+ if (catalog.getConnector() != null) {
+ catalog.getConnector().connectTrinoCatalog(this,
catalog.getHookInstanceName(), catalogName, entity, ret);
+ }
+
+ ret = createEntity(new AtlasEntityWithExtInfo(entity));
+ } else {
+ AtlasEntity entity = ret.getEntity();
+
+ entity.setRelationshipAttribute(TRINO_CATALOG_INSTANCE_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(catalog.getTrinoInstanceEntity().getEntity(),
TRINO_CATALOG_INSTANCE_RELATIONSHIP));
+
+ if (catalog.getConnector() != null) {
+ catalog.getConnector().connectTrinoCatalog(this,
catalog.getHookInstanceName(), catalogName, entity, ret);
+ }
+
+ ret.setEntity(entity);
+
+ updateEntity(ret);
+ }
+
+ return ret;
+ }
+
+ public AtlasEntityWithExtInfo createOrUpdateSchemaEntity(Catalog catalog,
AtlasEntity catalogEntity, String schema) throws AtlasServiceException {
+ String qualifiedName = String.format("%s.%s@%s",
catalog.getName(), schema, catalog.getInstanceName());
+ AtlasEntityWithExtInfo ret = findEntity(TRINO_SCHEMA,
qualifiedName, true, true);
+
+ if (ret == null) {
+ AtlasEntity entity = new AtlasEntity(TRINO_SCHEMA);
+
+ entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+ entity.setAttribute(NAME_ATTRIBUTE, schema);
+ entity.setRelationshipAttribute(TRINO_SCHEMA_CATALOG_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(catalogEntity,
TRINO_SCHEMA_CATALOG_RELATIONSHIP));
+
+ if (catalog.getConnector() != null) {
+ catalog.getConnector().connectTrinoSchema(this,
catalog.getHookInstanceName(), catalog.getName(), schema, entity, ret);
+ }
+
+ ret = createEntity(new AtlasEntityWithExtInfo(entity));
+ } else {
+ AtlasEntity entity = ret.getEntity();
+
+ entity.setRelationshipAttribute(TRINO_SCHEMA_CATALOG_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(catalogEntity,
TRINO_SCHEMA_CATALOG_RELATIONSHIP));
+
+ if (catalog.getConnector() != null) {
+ catalog.getConnector().connectTrinoSchema(this,
catalog.getHookInstanceName(), catalog.getName(), schema, entity, ret);
+ }
+
+ ret.setEntity(entity);
+
+ updateEntity(ret);
+ }
+
+ return ret;
+ }
+
+ public AtlasEntityWithExtInfo createOrUpdateTableEntity(Catalog catalog,
String schema, String table, Map<String, Object> tableMetadata, Map<String,
Map<String, Object>> trinoColumns, AtlasEntity schemaEntity) throws
AtlasServiceException {
+ String qualifiedName = String.format("%s.%s.%s@%s", catalog.getName(),
schema, table, catalog.getInstanceName());
+
+ AtlasEntityWithExtInfo ret;
+ AtlasEntityWithExtInfo tableEntityExt = findEntity(TRINO_TABLE,
qualifiedName, true, true);
+
+ if (tableEntityExt == null) {
+ tableEntityExt = toTableEntity(catalog, schema, table,
tableMetadata, trinoColumns, schemaEntity, tableEntityExt);
+ ret = createEntity(tableEntityExt);
+ } else {
+ ret = toTableEntity(catalog, schema, table, tableMetadata,
trinoColumns, schemaEntity, tableEntityExt);
+
+ updateEntity(ret);
+ }
+
+ return ret;
+ }
+
+ public void deleteByGuid(Set<String> guidTodelete) throws
AtlasServiceException {
+ if (CollectionUtils.isNotEmpty(guidTodelete)) {
+ for (String guid : guidTodelete) {
+ EntityMutationResponse response =
atlasClientV2.deleteEntityByGuid(guid);
+
+ if (response == null ||
response.getDeletedEntities().isEmpty()) {
+ LOG.debug("Entity with guid : {} is not deleted", guid);
+ } else {
+ LOG.debug("Entity with guid : {} is deleted", guid);
+ }
+ }
+ }
+ }
+
+ public AtlasEntityWithExtInfo findEntity(final String typeName, final
String qualifiedName, boolean minExtInfo, boolean ignoreRelationship) throws
AtlasServiceException {
+ try {
+ return atlasClientV2.getEntityByAttribute(typeName,
Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName), minExtInfo,
ignoreRelationship);
+ } catch (AtlasServiceException e) {
+ if (e.getStatus() == ClientResponse.Status.NOT_FOUND) {
+ return null;
+ }
+
+ throw e;
+ }
+ }
+
+ public void close() {
+ if (atlasClientV2 != null) {
+ atlasClientV2.close();
+
+ atlasClientV2 = null;
+ }
+ }
+
+ private synchronized AtlasClientV2 getAtlasClientV2Instance(Configuration
atlasConf) throws IOException {
+ if (atlasClientV2 == null) {
+ String[] atlasEndpoint = new String[] {DEFAULT_ATLAS_URL};
+
+ if (atlasConf != null &&
ArrayUtils.isNotEmpty(atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT)))
{
+ atlasEndpoint =
atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT);
+ }
+
+ if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+ String[] basicAuthUsernamePassword =
AuthenticationUtil.getBasicAuthenticationInput();
+
+ atlasClientV2 = new AtlasClientV2(atlasEndpoint,
basicAuthUsernamePassword);
+ } else {
+ UserGroupInformation ugi =
UserGroupInformation.getCurrentUser();
+
+ atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(),
atlasEndpoint);
+ }
+ }
+
+ return atlasClientV2;
+ }
+
+ private List<AtlasEntityHeader> getAllRelationshipEntities(String
entityGuid, String relationshipAttributeName) throws AtlasServiceException {
+ List<AtlasEntityHeader> entities = new ArrayList<>();
+
+ if (entityGuid != null) {
+ final int pageSize = DEFAULT_PAGE_LIMIT;
+
+ for (int i = 0; ; i++) {
+ int offset = pageSize * i;
+
+ LOG.debug("Retrieving: offset={}, pageSize={}", offset,
pageSize);
+
+ AtlasSearchResult searchResult =
atlasClientV2.relationshipSearch(entityGuid, relationshipAttributeName, null,
null, true, pageSize, offset);
+ List<AtlasEntityHeader> entityHeaders = searchResult == null ?
null : searchResult.getEntities();
+ int count = entityHeaders == null
? 0 : entityHeaders.size();
+
+ if (count > 0) {
+ entities.addAll(entityHeaders);
+ }
+
+ if (count < pageSize) { // last page
+ break;
+ }
+ }
+ }
+
+ return entities;
+ }
+
+ private AtlasEntityWithExtInfo toTableEntity(Catalog catalog, String
schema, String table, Map<String, Object> tableMetadata, Map<String,
Map<String, Object>> trinoColumns, AtlasEntity schemaEntity,
AtlasEntityWithExtInfo tableEntityExt) {
+ if (tableEntityExt == null) {
+ tableEntityExt = new AtlasEntityWithExtInfo(new
AtlasEntity(TRINO_TABLE));
+ }
+
+ AtlasEntity tableEntity = tableEntityExt.getEntity();
+ List<AtlasEntity> columnEntities = new ArrayList<>();
+ String qualifiedName = String.format("%s.%s.%s@%s",
catalog.getName(), schema, table, catalog.getInstanceName());
+
+ tableEntity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+ tableEntity.setAttribute(NAME_ATTRIBUTE, table);
+ tableEntity.setAttribute(TRINO_TABLE_TYPE,
tableMetadata.get(TRINO_TABLE_TYPE).toString());
+
+ for (Map.Entry<String, Map<String, Object>> columnEntry :
trinoColumns.entrySet()) {
+ AtlasEntity entity = new AtlasEntity(TRINO_COLUMN);
+ String columnName = columnEntry.getKey();
+ String colQualifiedName = String.format("%s.%s.%s.%s@%s",
catalog.getName(), schema, table, columnName, catalog.getInstanceName());
+
+ entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, colQualifiedName);
+ entity.setAttribute(NAME_ATTRIBUTE, columnName);
+
+ if (MapUtils.isNotEmpty(columnEntry.getValue())) {
+ Map<String, Object> columnAttr = columnEntry.getValue();
+
+ entity.setAttribute(TRINO_COLUMN_DATA_TYPE_ATTRIBUTE,
columnAttr.get(TRINO_COLUMN_DATA_TYPE_ATTRIBUTE));
+ entity.setAttribute(TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE,
columnAttr.get(TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE));
+ entity.setAttribute(TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE,
columnAttr.get(TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE));
+ entity.setAttribute(TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE,
columnAttr.get(TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE));
+ }
+
+ entity.setRelationshipAttribute(TRINO_COLUMN_TABLE_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(tableEntity,
TRINO_TABLE_COLUMN_RELATIONSHIP));
+
+ columnEntities.add(entity);
+ }
+
+ tableEntity.setRelationshipAttribute(TRINO_TABLE_SCHEMA_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(schemaEntity,
TRINO_TABLE_SCHEMA_RELATIONSHIP));
+ tableEntity.setRelationshipAttribute(TRINO_TABLE_COLUMN_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectIds(columnEntities,
TRINO_TABLE_COLUMN_RELATIONSHIP));
+
+ if (catalog.getConnector() != null) {
+ catalog.getConnector().connectTrinoTable(this,
catalog.getHookInstanceName(), catalog.getName(), schema, table, tableEntity,
columnEntities, tableEntityExt);
+ }
+
+ tableEntityExt.addReferredEntity(schemaEntity);
+
+ for (AtlasEntity column : columnEntities) {
+ tableEntityExt.addReferredEntity(column);
+ }
+
+ return tableEntityExt;
+ }
+
+ private AtlasEntityWithExtInfo createEntity(AtlasEntityWithExtInfo entity)
throws AtlasServiceException {
+ LOG.debug("creating {} entity: {}", entity.getEntity().getTypeName(),
entity);
+
+ AtlasEntityWithExtInfo ret = null;
+ EntityMutationResponse response =
atlasClientV2.createEntity(entity);
+ List<AtlasEntityHeader> createdEntities =
response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
+
+ if (CollectionUtils.isNotEmpty(createdEntities)) {
+ for (AtlasEntityHeader createdEntity : createdEntities) {
+ if (ret == null) {
+ ret =
atlasClientV2.getEntityByGuid(createdEntity.getGuid());
+
+ LOG.debug("Created {} entity: name={}, guid={}",
ret.getEntity().getTypeName(),
ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME),
ret.getEntity().getGuid());
+ } else if (ret.getEntity(createdEntity.getGuid()) == null) {
+ AtlasEntityWithExtInfo newEntity =
atlasClientV2.getEntityByGuid(createdEntity.getGuid());
+
+ ret.addReferredEntity(newEntity.getEntity());
+
+ if (MapUtils.isNotEmpty(newEntity.getReferredEntities())) {
+ for (Map.Entry<String, AtlasEntity> entry :
newEntity.getReferredEntities().entrySet()) {
+ ret.addReferredEntity(entry.getKey(),
entry.getValue());
+ }
+ }
+
+ LOG.debug("Created {} entity: name={}, guid={}",
newEntity.getEntity().getTypeName(),
newEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME),
newEntity.getEntity().getGuid());
+ }
+ }
+ }
+
+ clearRelationshipAttributes(ret);
+
+ return ret;
+ }
+
+ private void updateEntity(AtlasEntityWithExtInfo entity) throws
AtlasServiceException {
+ LOG.debug("updating {} entity: {}", entity.getEntity().getTypeName(),
entity);
+
+ atlasClientV2.updateEntity(entity);
+
+ LOG.debug("Updated {} entity: name={}, guid={}",
entity.getEntity().getTypeName(),
entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME),
entity.getEntity().getGuid());
+ }
+
+ private void clearRelationshipAttributes(AtlasEntityWithExtInfo entity) {
+ if (entity != null) {
+ clearRelationshipAttributes(entity.getEntity());
+
+ if (entity.getReferredEntities() != null) {
+
clearRelationshipAttributes(entity.getReferredEntities().values());
+ }
+ }
+ }
+
+ private void clearRelationshipAttributes(Collection<AtlasEntity> entities)
{
+ if (entities != null) {
+ for (AtlasEntity entity : entities) {
+ clearRelationshipAttributes(entity);
+ }
+ }
+ }
+
+ private void clearRelationshipAttributes(AtlasEntity entity) {
+ if (entity != null && entity.getRelationshipAttributes() != null) {
+ entity.getRelationshipAttributes().clear();
+ }
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java
new file mode 100644
index 000000000..fcb2595a8
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.client;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TrinoClientHelper {
+ private final String jdbcUrl;
+ private final String username;
+ private final String password;
+
+ public TrinoClientHelper(Configuration atlasConf) {
+ this.jdbcUrl = atlasConf.getString("atlas.trino.jdbc.address");
+ this.username = atlasConf.getString("atlas.trino.jdbc.user");
+ this.password = atlasConf.getString("atlas.trino.jdbc.password", "");
+ }
+
+ public Map<String, String> getAllTrinoCatalogs() {
+ Map<String, String> catalogs = new HashMap<>();
+
+ try (Connection connection = getTrinoConnection();
+ Statement stmt = connection.createStatement()) {
+ String query = "SELECT catalog_name, connector_name FROM
system.metadata.catalogs";
+ ResultSet rs = stmt.executeQuery(query);
+
+ while (rs.next()) {
+ catalogs.put(rs.getString("catalog_name"),
rs.getString("connector_name"));
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ return catalogs;
+ }
+
+ public List<String> getTrinoSchemas(String catalog, String schemaToImport)
throws SQLException {
+ List<String> schemas = new ArrayList<>();
+
+ try (Connection connection = getTrinoConnection();
+ Statement stmt = connection.createStatement()) {
+ StringBuilder query = new StringBuilder();
+
+ query.append("SELECT schema_name FROM
").append(catalog).append(".information_schema.schemata");
+
+ if (StringUtils.isNotEmpty(schemaToImport)) {
+ query.append(" where schema_name =
'").append(schemaToImport).append("'");
+ }
+
+ ResultSet rs = stmt.executeQuery(query.toString());
+
+ while (rs.next()) {
+ schemas.add(rs.getString("schema_name"));
+ }
+ }
+
+ return schemas;
+ }
+
+ public Map<String, Map<String, Object>> getTrinoTables(String catalog,
String schema, String tableToImport) throws SQLException {
+ Map<String, Map<String, Object>> tables = new HashMap<>();
+
+ try (Connection connection = getTrinoConnection();
+ Statement stmt = connection.createStatement()) {
+ StringBuilder query = new StringBuilder();
+
+ query.append("SELECT table_name, table_type FROM
").append(catalog).append(".information_schema.tables WHERE table_schema =
'").append(schema).append("'");
+
+ if (StringUtils.isNotEmpty(tableToImport)) {
+ query.append(" and table_name =
'").append(tableToImport).append("'");
+ }
+
+ ResultSet rs = stmt.executeQuery(query.toString());
+
+ while (rs.next()) {
+ Map<String, Object> tableMetadata = new HashMap<>();
+
+ tableMetadata.put("table_name", rs.getString("table_name"));
+ tableMetadata.put("table_type", rs.getString("table_type"));
+
+ tables.put(rs.getString("table_name"), tableMetadata);
+ }
+ }
+
+ return tables;
+ }
+
+ public Map<String, Map<String, Object>> getTrinoColumns(String catalog,
String schema, String table) throws SQLException {
+ Map<String, Map<String, Object>> columns = new HashMap<>();
+
+ try (Connection connection = getTrinoConnection();
+ Statement stmt = connection.createStatement()) {
+ StringBuilder query = new StringBuilder();
+
+ query.append("SELECT column_name, ordinal_position,
column_default, is_nullable, data_type FROM
").append(catalog).append(".information_schema.columns WHERE table_schema =
'").append(schema).append("' AND table_name = '").append(table).append("'");
+
+ ResultSet rs = stmt.executeQuery(query.toString());
+
+ while (rs.next()) {
+ Map<String, Object> columnMetadata = new HashMap<>();
+
+ columnMetadata.put("ordinal_position",
rs.getInt("ordinal_position"));
+ columnMetadata.put("column_default",
rs.getString("column_default"));
+ columnMetadata.put("column_name", rs.getString("column_name"));
+
+ if (StringUtils.isNotEmpty(rs.getString("is_nullable"))) {
+ if
(StringUtils.equalsIgnoreCase(rs.getString("is_nullable"), "YES")) {
+ columnMetadata.put("is_nullable", true);
+ } else {
+ columnMetadata.put("is_nullable", false);
+ }
+ }
+
+ columnMetadata.put("data_type", rs.getString("data_type"));
+
+ columns.put(rs.getString("column_name"), columnMetadata);
+ }
+ }
+
+ return columns;
+ }
+
+ private Connection getTrinoConnection() throws SQLException {
+ return DriverManager.getConnection(jdbcUrl, username, password);
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java
new file mode 100644
index 000000000..fd6bf5974
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+
+import java.util.List;
+
+public abstract class AtlasEntityConnector {
+ public abstract void connectTrinoCatalog(AtlasClientHelper atlasClient,
String instanceName, String catalogName, AtlasEntity entity,
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo);
+
+ public abstract void connectTrinoSchema(AtlasClientHelper atlasClient,
String instanceName, String catalogName, String schemaName, AtlasEntity entity,
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo);
+
+ public abstract void connectTrinoTable(AtlasClientHelper atlasClient,
String instanceName, String catalogName, String schemaName, String tableName,
AtlasEntity entity, List<AtlasEntity> columnEntities,
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo);
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java
new file mode 100644
index 000000000..e7e9d2664
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectorFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConnectorFactory.class);
+
+ public static AtlasEntityConnector getConnector(String connectorType) {
+ switch (connectorType.toLowerCase()) {
+ case "iceberg":
+ return new IcebergEntityConnector();
+ case "hive":
+ return new HiveEntityConnector();
+ default:
+ LOG.warn("{}: unrecognized connector type", connectorType);
+
+ return null;
+ }
+ }
+
+ private ConnectorFactory() {
+ throw new UnsupportedOperationException("This is a utility class and
cannot be instantiated");
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java
new file mode 100644
index 000000000..7f75f6da6
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class HiveEntityConnector extends AtlasEntityConnector {
+ public static final String HIVE_DB =
"hive_db";
+ public static final String HIVE_TABLE =
"hive_table";
+ public static final String HIVE_COLUMN =
"hive_column";
+ public static final String TRINO_SCHEMA_HIVE_DB_RELATIONSHIP =
"trino_schema_hive_db";
+ public static final String TRINO_TABLE_HIVE_TABLE_RELATIONSHIP =
"trino_table_hive_table";
+ public static final String TRINO_COLUMN_HIVE_COLUMN_RELATIONSHIP =
"trino_column_hive_column";
+ public static final String TRINO_SCHEMA_HIVE_DB_ATTRIBUTE =
"hive_db";
+ public static final String TRINO_TABLE_HIVE_TABLE_ATTRIBUTE =
"hive_table";
+ public static final String TRINO_COLUMN_HIVE_COLUMN_ATTRIBUTE =
"hive_column";
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveEntityConnector.class);
+
+ @Override
+ public void connectTrinoCatalog(AtlasClientHelper atlasClient, String
instanceName, String catalogName, AtlasEntity entity, AtlasEntityWithExtInfo
entityWithExtInfo) {
+ }
+
+ @Override
+ public void connectTrinoSchema(AtlasClientHelper atlasClient, String
instanceName, String catalogName, String schemaName, AtlasEntity dbEntity,
AtlasEntityWithExtInfo entityWithExtInfo) {
+ if (instanceName == null) {
+ LOG.warn("Failed attempting to connect entity since hook namespace
is empty, Please configure in properties");
+ } else {
+ try {
+ AtlasEntity hiveDb = toDbEntity(atlasClient, instanceName,
schemaName);
+
+ if (hiveDb != null) {
+
dbEntity.setRelationshipAttribute(TRINO_SCHEMA_HIVE_DB_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(hiveDb,
TRINO_SCHEMA_HIVE_DB_RELATIONSHIP));
+ }
+ } catch (AtlasServiceException e) {
+ LOG.error("Error encountered: ", e);
+ }
+ }
+ }
+
+ @Override
+ public void connectTrinoTable(AtlasClientHelper atlasClient, String
instanceName, String catalogName, String schemaName, String tableName,
AtlasEntity trinoTable, List<AtlasEntity> columnEntities,
AtlasEntityWithExtInfo entityWithExtInfo) {
+ if (instanceName == null) {
+ LOG.warn("Failed attempting to connect entity since hook namespace
is empty, Please configure in properties");
+ } else {
+ try {
+ AtlasEntity hiveTable = toTableEntity(atlasClient,
instanceName, schemaName, tableName);
+
+ if (hiveTable != null) {
+
trinoTable.setRelationshipAttribute(TRINO_TABLE_HIVE_TABLE_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(hiveTable,
TRINO_TABLE_HIVE_TABLE_RELATIONSHIP));
+
+ for (AtlasEntity columnEntity : columnEntities) {
+ connectTrinoColumn(atlasClient, instanceName,
schemaName, tableName, columnEntity);
+ }
+ }
+ } catch (AtlasServiceException e) {
+ LOG.error("Error encountered: ", e);
+ }
+ }
+ }
+
+ private void connectTrinoColumn(AtlasClientHelper atlasClient, String
instanceName, String schemaName, String tableName, AtlasEntity trinoColumn)
throws AtlasServiceException {
+ if (instanceName != null) {
+ try {
+ AtlasEntity hiveColumn = toColumnEntity(atlasClient,
instanceName, schemaName, tableName,
trinoColumn.getAttribute("name").toString());
+
+ if (hiveColumn != null) {
+
trinoColumn.setRelationshipAttribute(TRINO_COLUMN_HIVE_COLUMN_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(hiveColumn,
TRINO_COLUMN_HIVE_COLUMN_RELATIONSHIP));
+ }
+ } catch (AtlasServiceException e) {
+ throw new AtlasServiceException(e);
+ }
+ }
+ }
+
+ private AtlasEntity toDbEntity(AtlasClientHelper atlasClient, String
instanceName, String schemaName) throws AtlasServiceException {
+ String dbQualifiedName = schemaName + "@" +
instanceName;
+ AtlasEntityWithExtInfo ret =
atlasClient.findEntity(HIVE_DB, dbQualifiedName, true, true);
+
+ return ret != null ? ret.getEntity() : null;
+ }
+
+ private AtlasEntity toTableEntity(AtlasClientHelper atlasClient, String
instanceName, String schemaName, String tableName) throws AtlasServiceException
{
+ String tableQualifiedName = schemaName + "." +
tableName + "@" + instanceName;
+ AtlasEntityWithExtInfo ret =
atlasClient.findEntity(HIVE_TABLE, tableQualifiedName, true, true);
+
+ return ret != null ? ret.getEntity() : null;
+ }
+
+ private AtlasEntity toColumnEntity(AtlasClientHelper atlasClient, String
instanceName, String schemaName, String tableName, String columnName) throws
AtlasServiceException {
+ String columnQualifiedName = schemaName + "." +
tableName + "." + columnName + "@" + instanceName;
+ AtlasEntityWithExtInfo ret =
atlasClient.findEntity(HIVE_COLUMN, columnQualifiedName, true, true);
+
+ return ret != null ? ret.getEntity() : null;
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java
new file mode 100644
index 000000000..8c99f36f4
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class IcebergEntityConnector extends AtlasEntityConnector {
+ public static final String HIVE_DB =
"hive_db";
+ public static final String ICEBERG_TABLE =
"iceberg_table";
+ public static final String ICEBERG_COLUMN =
"iceberg_column";
+ public static final String TRINO_SCHEMA_HIVE_DB_RELATIONSHIP =
"trino_schema_hive_db";
+ public static final String TRINO_TABLE_ICEBERG_TABLE_RELATIONSHIP =
"trino_table_iceberg_table";
+ public static final String TRINO_COLUMN_ICEBERG_COLUMN_RELATIONSHIP =
"trino_column_iceberg_column";
+ public static final String TRINO_SCHEMA_HIVE_DB_ATTRIBUTE =
"hive_db";
+ public static final String TRINO_TABLE_ICEBERG_TABLE_ATTRIBUTE =
"iceberg_table";
+ public static final String TRINO_COLUMN_ICEBERG_COLUMN_ATTRIBUTE =
"iceberg_column";
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergEntityConnector.class);
+
+ @Override
+ public void connectTrinoCatalog(AtlasClientHelper atlasClient, String
instanceName, String catalogName, AtlasEntity entity, AtlasEntityWithExtInfo
entityWithExtInfo) {
+ }
+
+ @Override
+ public void connectTrinoSchema(AtlasClientHelper atlasClient, String
instanceName, String catalogName, String schemaName, AtlasEntity dbEntity,
AtlasEntityWithExtInfo entityWithExtInfo) {
+ if (instanceName == null) {
+ LOG.warn("Failed attempting to connect entity since hook namespace
is empty, Please configure in properties");
+ } else {
+ try {
+ AtlasEntity hiveDb = toDbEntity(atlasClient, instanceName,
schemaName);
+
+ if (hiveDb != null) {
+
dbEntity.setRelationshipAttribute(TRINO_SCHEMA_HIVE_DB_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(hiveDb,
TRINO_SCHEMA_HIVE_DB_RELATIONSHIP));
+ }
+ } catch (AtlasServiceException e) {
+ LOG.error("Error encountered: ", e);
+ }
+ }
+ }
+
+ @Override
+ public void connectTrinoTable(AtlasClientHelper atlasClient, String
instanceName, String catalogName, String schemaName, String tableName,
AtlasEntity trinoTable, List<AtlasEntity> columnEntities,
AtlasEntityWithExtInfo entityWithExtInfo) {
+ if (instanceName == null) {
+ LOG.warn("Failed attempting to connect entity since hook namespace
is empty, Please configure in properties");
+ } else {
+ try {
+ AtlasEntity icebergTable = toTableEntity(atlasClient,
instanceName, schemaName, tableName);
+
+ if (icebergTable != null) {
+
trinoTable.setRelationshipAttribute(TRINO_TABLE_ICEBERG_TABLE_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(icebergTable,
TRINO_TABLE_ICEBERG_TABLE_RELATIONSHIP));
+
+ for (AtlasEntity columnEntity : columnEntities) {
+ connectTrinoColumn(atlasClient, instanceName,
schemaName, tableName, columnEntity);
+ }
+ }
+ } catch (AtlasServiceException e) {
+ LOG.error("Error encountered: ", e);
+ }
+ }
+ }
+
+ private void connectTrinoColumn(AtlasClientHelper atlasClient, String
instanceName, String schemaName, String tableName, AtlasEntity trinoColumn)
throws AtlasServiceException {
+ if (instanceName != null) {
+ try {
+ AtlasEntity icebergColumn = toColumnEntity(atlasClient,
instanceName, schemaName, tableName,
trinoColumn.getAttribute("name").toString());
+
+ if (icebergColumn != null) {
+
trinoColumn.setRelationshipAttribute(TRINO_COLUMN_ICEBERG_COLUMN_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(icebergColumn,
TRINO_COLUMN_ICEBERG_COLUMN_RELATIONSHIP));
+ }
+ } catch (AtlasServiceException e) {
+ throw new AtlasServiceException(e);
+ }
+ }
+ }
+
+ private AtlasEntity toDbEntity(AtlasClientHelper atlasClient, String
instanceName, String schemaName) throws AtlasServiceException {
+ String dbQualifiedName = schemaName + "@" +
instanceName;
+ AtlasEntityWithExtInfo ret =
atlasClient.findEntity(HIVE_DB, dbQualifiedName, true, true);
+
+ return ret != null ? ret.getEntity() : null;
+ }
+
+ private AtlasEntity toTableEntity(AtlasClientHelper atlasClient, String
instanceName, String schemaName, String tableName) throws AtlasServiceException
{
+ String tableQualifiedName = schemaName + "." +
tableName + "@" + instanceName;
+ AtlasEntityWithExtInfo ret =
atlasClient.findEntity(ICEBERG_TABLE, tableQualifiedName, true, true);
+
+ return ret != null ? ret.getEntity() : null;
+ }
+
+ private AtlasEntity toColumnEntity(AtlasClientHelper atlasClient, String
instanceName, String schemaName, String tableName, String columnName) throws
AtlasServiceException {
+ String columnQualifiedName = schemaName + "." +
tableName + "." + columnName + "@" + instanceName;
+ AtlasEntityWithExtInfo ret =
atlasClient.findEntity(ICEBERG_COLUMN, columnQualifiedName, true, true);
+
+ return ret != null ? ret.getEntity() : null;
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java
new file mode 100644
index 000000000..cfeca6c78
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.model;
+
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.trino.connector.AtlasEntityConnector;
+import org.apache.atlas.trino.connector.ConnectorFactory;
+
+public class Catalog {
+ private final String name;
+ private final String type;
+ private final String hookInstanceName;
+ private final String instanceName;
+ private final AtlasEntityConnector connector;
+ private AtlasEntityWithExtInfo trinoInstanceEntity;
+ private String schemaToImport;
+ private String tableToImport;
+
+ public Catalog(String name, String type, boolean hookEnabled, String
hookInstanceName, String instanceName) {
+ this.name = name;
+ this.type = type;
+ this.hookInstanceName = hookInstanceName;
+ this.instanceName = instanceName;
+ this.connector = hookEnabled ?
ConnectorFactory.getConnector(type) : null;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public AtlasEntityConnector getConnector() {
+ return connector;
+ }
+
+ public String getHookInstanceName() {
+ return hookInstanceName;
+ }
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public AtlasEntityWithExtInfo getTrinoInstanceEntity() {
+ return trinoInstanceEntity;
+ }
+
+ public void setTrinoInstanceEntity(AtlasEntityWithExtInfo
trinoInstanceEntity) {
+ this.trinoInstanceEntity = trinoInstanceEntity;
+ }
+
+ public String getTableToImport() {
+ return tableToImport;
+ }
+
+ public void setTableToImport(String tableToImport) {
+ this.tableToImport = tableToImport;
+ }
+
+ public String getSchemaToImport() {
+ return schemaToImport;
+ }
+
+ public void setSchemaToImport(String schemaToImport) {
+ this.schemaToImport = schemaToImport;
+ }
+}
diff --git
a/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java
b/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java
new file mode 100644
index 000000000..e0e74ff9b
--- /dev/null
+++
b/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+public class TrinoExtractorIT {
+ /* List of testcases
+ Invalid Arguments
+ Invalid cron expression
+ Test valid Catalog to be run
+ Test Instance creation
+ Test catalog creation
+ Test schema creation
+ Test table creation
+ Test of hook is enabled, hook entity if created, is connected to Trino
entity
+ Test cron doesn't trigger new job, before earlier thread completes
+ Test without cron expression
+ Test even if catalog is not registered, it should run if passed from
commandLine
+ Deleted table
+ Deleted catalog
+ Deleted column
+ Deleted schema
+ Rename catalog
+ Rename schema
+ Tag propagated*/
+}
diff --git a/distro/pom.xml b/distro/pom.xml
index af0d01060..d7e958889 100644
--- a/distro/pom.xml
+++ b/distro/pom.xml
@@ -276,6 +276,7 @@ atlas.graph.storage.hbase.regions-per-server=1
<!--<descriptor>src/main/assemblies/migration-exporter.xml</descriptor>-->
<descriptor>src/main/assemblies/classification-updater.xml</descriptor>
<descriptor>src/main/assemblies/notification-analyzer.xml</descriptor>
+
<descriptor>src/main/assemblies/atlas-trino-extractor.xml</descriptor>
</descriptors>
<finalName>apache-atlas-${project.version}</finalName>
<tarLongFileMode>gnu</tarLongFileMode>
diff --git a/distro/src/main/assemblies/atlas-trino-extractor.xml
b/distro/src/main/assemblies/atlas-trino-extractor.xml
new file mode 100644
index 000000000..bffffd67b
--- /dev/null
+++ b/distro/src/main/assemblies/atlas-trino-extractor.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <id>trino-extractor</id>
+
<baseDirectory>apache-atlas-trino-extractor-${project.version}</baseDirectory>
+ <fileSets>
+ <fileSet>
+ <includes>
+ <include>README*</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>../addons/trino-extractor/src/main/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>atlas-trino-extractor-logback.xml</include>
+ <include>atlas-trino-extractor.properties</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+ <directory>../addons/trino-extractor/src/main/bin</directory>
+ <outputDirectory>/bin</outputDirectory>
+ <includes>
+ <include>run-trino-extractor.sh</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+
<directory>../addons/trino-extractor/target/dependency/trino</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>../addons/trino-extractor/target</directory>
+ <outputDirectory>/lib</outputDirectory>
+ <includes>
+ <include>atlas-trino-extractor-*.jar</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+
+</assembly>
diff --git a/docs/src/documents/Tools/TrinoExtractor.md
b/docs/src/documents/Tools/TrinoExtractor.md
new file mode 100644
index 000000000..cf660f643
--- /dev/null
+++ b/docs/src/documents/Tools/TrinoExtractor.md
@@ -0,0 +1,149 @@
+---
+name: Trino Extractor
+route: /TrinoExtractor
+menu: Documentation
+submenu: Tools
+---
+
+import themen from 'theme/styles/styled-colors';
+import * as theme from 'react-syntax-highlighter/dist/esm/styles/hljs';
+import SyntaxHighlighter from 'react-syntax-highlighter';
+
+# Trino Extractor
+
+## Overview
+
+The Trino Extractor is a comprehensive metadata extraction utility designed
for Apache Atlas integration with Trino. It provides discovery, extraction, and
synchronization of Trino metadata including catalogs, schemas, tables, and
columns into Apache Atlas for enhanced data governance and metadata management.
+
+## Key Features
+
+### Metadata Extraction
+* **Comprehensive Discovery**: Automatically discovers and extracts metadata
from Trino catalogs, schemas, tables, and columns
+* **JDBC-Based Connection**: Uses standard Trino JDBC driver for reliable
connectivity
+* **Selective Extraction**: Supports extraction for specific catalog, schema,
or table names
+
+### Atlas Integration
+* **Entity Management**: Creates and updates Atlas entities for Trino metadata
objects
+* **Relationship Mapping**: Establishes proper hierarchical relationships
between catalogs, schemas, tables, and columns
+* **Synchronization**: Maintains consistency by removing Atlas entities that
no longer exist in Trino
+* **Connector Support**: Specialized handling for Trino connectors for which
Atlas captures the metadata through individual Hook like Hive, Iceberg
+
+### Scheduling & Automation
+* **Cron-based Scheduling**: Supports automated periodic extraction using cron
expressions
+* **One-time Execution**: Can be run as a single extraction job
+* **Error Handling**: Robust error handling with detailed logging
+
+## Architecture
+
+<SyntaxHighlighter language="text" style={theme.github}>
+{`
+┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
+│ Trino Cluster │ │ Trino Extractor │ │ Apache Atlas │
+│ │ │ │ │ │
+│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
+│ │ Catalogs │◄┼────┼►│ JDBC Client │ │ │ │ Entities │ │
+│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
+│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
+│ │ Schemas │ │ │ │ Extraction │◄┼────┼►│Relationships│ │
+│ └─────────────┘ │ │ │ Service │ │ │ └─────────────┘ │
+│ ┌─────────────┐ │ │ └─────────────┘ │ │ ┌─────────────┐ │
+│ │ Tables │ │ │ ┌─────────────┐ │ │ │ Lineage │ │
+│ └─────────────┘ │ │ │Atlas Client │◄┼────┼►│ Data │ │
+│ ┌─────────────┐ │ │ └─────────────┘ │ │ └─────────────┘ │
+│ │ Columns │ │ │ │ │ │
+│ └─────────────┘ │ └─────────────────┘ └─────────────────┘
+└─────────────────┘
+`}
+</SyntaxHighlighter>
+
+## Quick Start
+
+### 1. Configuration Setup
+
+Configure the `atlas-trino-extractor.properties` file:
+
+<SyntaxHighlighter language="properties" style={theme.github}>
+{`
+# Atlas connection
+atlas.rest.address=http://localhost:21000/
+# Trino connection
+atlas.trino.jdbc.address=jdbc:trino://localhost:8080/
+atlas.trino.jdbc.user=your-username
+# Catalogs to extract
+atlas.trino.catalogs.registered=hive_catalog,iceberg_catalog
+`}
+</SyntaxHighlighter>
+
+### 2. Basic Execution
+
+<SyntaxHighlighter language="bash" style={theme.github}>
+{`
+# Extract all registered catalogs
+./bin/run-trino-extractor.sh
+# Extract specific catalog
+./bin/run-trino-extractor.sh -c my_catalog
+# Schedule periodic extraction (every 6 hours)
+./bin/run-trino-extractor.sh -cx "0 0 */6 * * ?"
+`}
+</SyntaxHighlighter>
+
+## Configuration Properties
+
+| Property | Description | Default | Example |
+|----------|-------------|---------|---------|
+| `atlas.rest.address` | Atlas REST API endpoint | `http://localhost:21000/` |
`https://atlas.company.com:21443/` |
+| `atlas.trino.jdbc.address` | Trino JDBC URL | - |
`jdbc:trino://trino-server:8080/` |
+| `atlas.trino.jdbc.user` | Trino username | - | `admin` |
+| `atlas.trino.jdbc.password` | Trino password | `""` | `password123` |
+| `atlas.trino.namespace` | Trino instance namespace | `cm` |
`production-cluster` |
+| `atlas.trino.catalogs.registered` | Catalogs to extract | - |
`hive,iceberg,mysql` |
+| `atlas.trino.catalog.hook.enabled.<catalog-name>` | Hook enabled under atlas
for this catalog? | `false` | `true` |
+| `atlas.trino.catalog.hook.enabled.<catalog-name>.namespace` | Namespace
under Atlas for this Hook | `cm` | `cm` |
+| `atlas.trino.extractor.schedule` | Cron expression | - | `0 0 2 * * ?` |
+
+## Command Line Usage
+
+### Available Options
+
+| Option | Long Form | Description | Example |
+|--------|-----------|-------------|---------|
+| `-c` | `--catalog` | Extract specific catalog | `-c hive_catalog` |
+| `-s` | `--schema` | Extract specific schema | `-s sales_data` |
+| `-t` | `--table` | Extract specific table | `-t customer_orders` |
+| `-cx` | `--cronExpression` | Schedule with cron expression | `-cx "0 0 2 * *
?"` |
+| `-h` | `--help` | Display help information | `-h` |
+
+## Connector-Specific Processing
+
+#### For Example: Hive Connector Integration
+<SyntaxHighlighter language="properties" style={theme.github}>
+{`
+# Enable Hive hook integration
+atlas.trino.catalog.hook.enabled.hive_catalog=true
+atlas.trino.catalog.hook.enabled.hive_catalog.namespace=cm
+`}
+</SyntaxHighlighter>
+
+#### Benefits:
+- Links Trino entities with existing Hive entities
+- Maintains consistency between Hive and Trino metadata
+- Supports environments with Atlas Hive hooks
+
+## FAQ
+
+### Troubleshooting Questions
+
+**Q: Why are some entities not appearing in Atlas?**
+
+A: Check catalog registration, permissions, and network connectivity. Review
logs for specific errors.
+
+**Q: How do I handle large clusters with thousands of tables?**
+
+A: Use selective extraction, increase memory allocation, schedule during
off-peak hours, and process catalogs individually.
+
+### Documentation
+- [Apache Atlas Documentation](https://atlas.apache.org/#/)
+- [Trino Documentation](https://trino.io/docs/)
+- [Atlas REST API Reference](https://atlas.apache.org/api/v2/)
+
+
diff --git
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
index c9fa2e4cd..a1c6cf87e 100644
---
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
+++
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
@@ -87,7 +87,7 @@ public class AtlasJanusGraphManagement implements
AtlasGraphManagement {
private final AtlasJanusGraph graph;
private final JanusGraphManagement management;
private final Set<String> newMultProperties = new HashSet<>();
- private boolean isSuccess = false;
+ private boolean isSuccess;
public AtlasJanusGraphManagement(AtlasJanusGraph graph,
JanusGraphManagement managementSystem) {
this.management = managementSystem;
diff --git a/pom.xml b/pom.xml
index 9341140d4..ac2d4df6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
<module>addons/sqoop-bridge-shim</module>
<module>addons/storm-bridge</module>
<module>addons/storm-bridge-shim</module>
+ <module>addons/trino-extractor</module>
<module>atlas-examples</module>
<module>authorization</module>
<module>build-tools</module>
diff --git
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 191dea810..d6aaab025 100755
---
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -20,7 +20,6 @@ package org.apache.atlas.repository.graph;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.discovery.SearchIndexer;
import org.apache.atlas.exception.AtlasBaseException;
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index cf364a01d..0f9a4831e 100644
---
a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -22,7 +22,6 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasGraph;