This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch ATLAS-5021
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/ATLAS-5021 by this push:
new bc700ec20 ATLAS-5021: Extract Metadata from Trino periodically
new fbcbc6e6c ATLAS-5021: Extract Metadata from Trino periodically
bc700ec20 is described below
commit bc700ec20552ff24da2f5b3b2e63f8f6fdae1483
Author: Pinal Shah <[email protected]>
AuthorDate: Tue Apr 22 16:45:11 2025 +0700
ATLAS-5021: Extract Metadata from Trino periodically
---
addons/models/6000-Trino/6000-trino_model.json | 442 +++++++++++++++++++++
addons/trino-extractor/pom.xml | 93 +++++
.../src/main/bin/run-trino-extractor.sh | 138 +++++++
.../src/main/conf/atlas-application.properties | 32 ++
.../trino-extractor/src/main/conf/atlas-log4j.xml | 42 ++
.../apache/atlas/trino/cli/ExtractorContext.java | 106 +++++
.../apache/atlas/trino/cli/ExtractorService.java | 359 +++++++++++++++++
.../org/apache/atlas/trino/cli/TrinoExtractor.java | 182 +++++++++
.../atlas/trino/client/AtlasClientHelper.java | 431 ++++++++++++++++++++
.../atlas/trino/client/TrinoClientHelper.java | 132 ++++++
.../trino/connector/AtlasEntityConnector.java | 31 ++
.../atlas/trino/connector/ConnectorFactory.java | 37 ++
.../atlas/trino/connector/HiveEntityConnector.java | 125 ++++++
.../trino/connector/RdbmsEntityConnector.java | 43 ++
.../java/org/apache/atlas/trino/model/Catalog.java | 93 +++++
.../apache/atlas/trino/cli/TrinoExtractorIT.java | 42 ++
distro/pom.xml | 1 +
.../src/main/assemblies/atlas-trino-extractor.xml | 65 +++
pom.xml | 1 +
19 files changed, 2395 insertions(+)
diff --git a/addons/models/6000-Trino/6000-trino_model.json
b/addons/models/6000-Trino/6000-trino_model.json
new file mode 100644
index 000000000..66dee5e1e
--- /dev/null
+++ b/addons/models/6000-Trino/6000-trino_model.json
@@ -0,0 +1,442 @@
+{
+ "enumDefs": [],
+ "structDefs": [],
+ "classificationDefs": [],
+ "entityDefs": [
+ {
+ "name": "trino_instance",
+ "superTypes": ["DataSet"],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ {
+ "name": "hostname",
+ "typeName": "string",
+ "isOptional": true,
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false
+ },
+ {
+ "name": "port",
+ "typeName": "int",
+ "isOptional": true,
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false
+ }
+ ]
+ },
+ {
+ "name": "trino_catalog",
+ "superTypes": [
+ "Asset"
+ ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ {
+ "name": "connectorType",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false
+ },
+ {
+ "name": "connectionUrl",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false,
+ "isOptional": true
+ },
+ {
+ "name": "endPointUrl",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false,
+ "isOptional": true
+ },{
+ "name": "extraConfigs",
+ "typeName": "map<string,string>",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": true,
+ "isUnique": false
+ }
+ ]
+ },
+ {
+ "name": "trino_schema",
+ "superTypes": [
+ "Asset"
+ ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ {
+ "name": "parameters",
+ "typeName": "map<string,string>",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": true,
+ "isUnique": false
+ }
+ ]
+ },
+ {
+ "name": "trino_table",
+ "superTypes": [
+ "DataSet"
+ ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ {
+ "name": "comment",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": true,
+ "isUnique": false
+ },
+ {
+ "name": "parameters",
+ "typeName": "map<string,string>",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": true,
+ "isUnique": false
+ },
+ {
+ "name": "table_type",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isIndexable": false,
+ "isOptional": true,
+ "isUnique": false
+ }
+ ]
+ },
+ {
+ "name": "trino_column",
+ "superTypes": [
+ "DataSet"
+ ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ {
+ "name": "data_type",
+ "typeName": "string",
+ "isOptional": true,
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": true
+ },
+ {
+ "name": "ordinal_position",
+ "typeName": "int",
+ "isOptional": true,
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false
+ },
+ {
+ "name": "column_default",
+ "typeName": "string",
+ "isOptional": true,
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false
+ },
+ {
+ "name": "comment",
+ "typeName": "string",
+ "isOptional": true,
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false
+ },
+ {
+ "name": "is_nullable",
+ "typeName": "boolean",
+ "isOptional": true,
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false
+ },
+ {
+ "name": "isPrimaryKey",
+ "typeName": "boolean",
+ "isOptional": true,
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false
+ }
+
+ ]
+ },
+ {
+ "name" : "trino_column_lineage",
+ "superTypes" : [
+ "Process"
+ ],
+ "serviceType": "trino",
+ "typeVersion" : "1.0",
+ "attributeDefs" : [
+ {
+ "name": "depenendencyType",
+ "typeName": "string",
+ "cardinality" : "SINGLE",
+ "isIndexable": false,
+ "isOptional": false,
+ "isUnique": false
+ },
+ {
+ "name": "expression",
+ "typeName": "string",
+ "cardinality" : "SINGLE",
+ "isIndexable": false,
+ "isOptional": true,
+ "isUnique": false
+ }
+ ]
+ },
+ {
+ "name": "trino_process",
+ "superTypes": [
+ "Process"
+ ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": [
+ {
+ "name": "queryId",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false,
+ "searchWeight": 5
+ },
+ {
+ "name": "queryCreateTime",
+ "typeName": "long",
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false,
+ "searchWeight": 5
+ },
+ {
+ "name": "queryEndTime",
+ "typeName": "long",
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false,
+ "searchWeight": 5
+ },
+ {
+ "name": "queryText",
+ "typeName": "string",
+ "cardinality": "SINGLE",
+ "isUnique": false,
+ "isIndexable": false,
+ "searchWeight": 5
+ }
+ ]
+ },
+ {
+ "name": "trino_schema_ddl",
+ "superTypes": [
+ "ddl"
+ ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": []
+ },
+ {
+ "name": "trino_table_ddl",
+ "superTypes": [
+ "ddl"
+ ],
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "attributeDefs": []
+ }
+ ],
+ "relationshipDefs": [
+ {
+ "name": "trino_instance_catalog",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "COMPOSITION",
+ "endDef1": {
+ "type": "trino_catalog",
+ "name": "instance",
+ "isContainer": false,
+ "cardinality": "SINGLE",
+ "isLegacyAttribute": false
+ },
+ "endDef2": {
+ "type": "trino_instance",
+ "name": "catalogs",
+ "isContainer": true,
+ "cardinality": "SET"
+ },
+ "propagateTags": "NONE"
+ },
+
+ {
+ "name": "trino_schema_catalog",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "AGGREGATION",
+ "endDef1": {
+ "type": "trino_schema",
+ "name": "catalog",
+ "isContainer": false,
+ "cardinality": "SINGLE",
+ "isLegacyAttribute": false
+ },
+ "endDef2": {
+ "type": "trino_catalog",
+ "name": "schemas",
+ "isContainer": true,
+ "cardinality": "SET"
+ },
+ "propagateTags": "NONE"
+ },
+
+ {
+ "name": "trino_table_schema",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "AGGREGATION",
+ "endDef1": {
+ "type": "trino_table",
+ "name": "trinoschema",
+ "isContainer": false,
+ "cardinality": "SINGLE",
+ "isLegacyAttribute": false
+ },
+ "endDef2": {
+ "type": "trino_schema",
+ "name": "tables",
+ "isContainer": true,
+ "cardinality": "SET"
+ },
+ "propagateTags": "NONE"
+ },
+ {
+ "name": "trino_table_columns",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "COMPOSITION",
+ "endDef1": {
+ "type": "trino_table",
+ "name": "columns",
+ "isContainer": true,
+ "cardinality": "SET",
+ "isLegacyAttribute": false
+ },
+ "endDef2": {
+ "type": "trino_column",
+ "name": "table",
+ "isContainer": false,
+ "cardinality": "SINGLE",
+ "isLegacyAttribute": false
+ },
+ "propagateTags": "NONE"
+ },
+ {
+ "name": "trino_table_ddl_queries",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "COMPOSITION",
+ "endDef1": {
+ "type": "trino_table",
+ "name": "ddlQueries",
+ "isContainer": true,
+ "cardinality": "SET"
+ },
+ "endDef2": {
+ "type": "trino_table_ddl",
+ "name": "table",
+ "isContainer": false,
+ "cardinality": "SINGLE"
+ },
+ "propagateTags": "NONE"
+ },
+ {
+ "name": "trino_schema_ddl_queries",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "COMPOSITION",
+ "endDef1": {
+ "type": "trino_schema",
+ "name": "ddlQueries",
+ "isContainer": true,
+ "cardinality": "SET"
+ },
+ "endDef2": {
+ "type": "trino_schema_ddl",
+ "name": "db",
+ "isContainer": false,
+ "cardinality": "SINGLE"
+ },
+ "propagateTags": "NONE"
+ },
+ {
+ "name": "trino_schema_hive_db",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "ASSOCIATION",
+ "endDef1": {
+ "type": "hive_db",
+ "name": "trino_schema",
+ "cardinality": "SET"
+ },
+ "endDef2": {
+ "type": "trino_schema",
+ "name": "hive_db",
+ "cardinality": "SINGLE"
+ },
+ "propagateTags": "BOTH"
+ },
+ {
+ "name": "trino_table_hive_table",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "ASSOCIATION",
+ "endDef1": {
+ "type": "hive_table",
+ "name": "trino_table",
+ "cardinality": "SET"
+ },
+ "endDef2": {
+ "type": "trino_table",
+ "name": "hive_table",
+ "cardinality": "SINGLE"
+ },
+ "propagateTags": "BOTH"
+ },
+ {
+ "name": "trino_column_hive_column",
+ "serviceType": "trino",
+ "typeVersion": "1.0",
+ "relationshipCategory": "ASSOCIATION",
+ "endDef1": {
+ "type": "hive_column",
+ "name": "trino_column",
+ "cardinality": "SET"
+ },
+ "endDef2": {
+ "type": "trino_column",
+ "name": "hive_column",
+ "cardinality": "SINGLE"
+ },
+ "propagateTags": "BOTH"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/addons/trino-extractor/pom.xml b/addons/trino-extractor/pom.xml
new file mode 100644
index 000000000..7ef285c7e
--- /dev/null
+++ b/addons/trino-extractor/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>apache-atlas</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+ <relativePath>../../</relativePath>
+ </parent>
+
+ <artifactId>atlas-trino-extractor</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Atlas Trino Bridge</name>
+ <description>Apache Atlas Trino Bridge Module</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ <version>1.9</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.trino</groupId>
+ <artifactId>trino-jdbc</artifactId>
+ <version>403</version>
+ <!-- java8 supported version -->
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-client-v2</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.atlas</groupId>
+ <artifactId>atlas-intg</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ <version>2.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.30</version>
+ </dependency>
+
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <excludeScope>test</excludeScope>
+ <includeScope>compile</includeScope>
+
<outputDirectory>${project.build.directory}/dependency/trino</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>false</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/addons/trino-extractor/src/main/bin/run-trino-extractor.sh
b/addons/trino-extractor/src/main/bin/run-trino-extractor.sh
new file mode 100755
index 000000000..a7d4c8d71
--- /dev/null
+++ b/addons/trino-extractor/src/main/bin/run-trino-extractor.sh
@@ -0,0 +1,138 @@
+#!/bin/bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License. See accompanying LICENSE file.
+#
+# resolve links - $0 may be a softlink
+PRG="${0}"
+
+[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true
+
+while [ -h "${PRG}" ]; do
+ ls=`ls -ld "${PRG}"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "${PRG}"`/"$link"
+ fi
+done
+
+BASEDIR=`dirname ${PRG}`
+BASEDIR=`cd ${BASEDIR}/..;pwd`
+
+if test -z "${JAVA_HOME}"
+then
+ JAVA_BIN=`which java`
+ JAR_BIN=`which jar`
+else
+ JAVA_BIN="${JAVA_HOME}/bin/java"
+ JAR_BIN="${JAVA_HOME}/bin/jar"
+fi
+export JAVA_BIN
+
+if [ ! -e "${JAVA_BIN}" ] || [ ! -e "${JAR_BIN}" ]; then
+ echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure
java and jar commands are available."
+ exit 1
+fi
+
+# Construct Atlas classpath using jars from hook/hive/atlas-hive-plugin-impl/
directory.
+for i in "${BASEDIR}/lib/"*.jar; do
+ ATLASCPPATH="${ATLASCPPATH}:$i"
+done
+
+if [ -z "${ATLAS_CONF_DIR}" ] && [ -e "${BASEDIR}/conf/" ];then
+ ATLAS_CONF_DIR="${BASEDIR}/conf/"
+fi
+ATLASCPPATH=${ATLASCPPATH}:${ATLAS_CONF_DIR}
+
+# log dir for applications
+ATLAS_LOG_DIR="${BASEDIR}/log"
+export ATLAS_LOG_DIR
+LOGFILE="$ATLAS_LOG_DIR/atlas-trino-extractor.log"
+
+TIME=`date +%Y%m%d%H%M%s`
+
+CP="${ATLASCPPATH}"
+
+# If running in cygwin, convert pathnames and classpath to Windows format.
+if [ "${CYGWIN}" == "true" ]
+then
+ ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}`
+ LOGFILE=`cygpath -w ${LOGFILE}`
+ HIVE_CP=`cygpath -w ${HIVE_CP}`
+ HADOOP_CP=`cygpath -w ${HADOOP_CP}`
+ CP=`cygpath -w -p ${CP}`
+fi
+
+JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR
-Datlas.log.file=atlas-trino-extractor.log
+-Dlog4j.configuration=atlas-log4j.xml -Djdk.httpclient.HttpClient.log=requests
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"
+
+IMPORT_ARGS=()
+JVM_ARGS=
+
+set -f
+while true
+do
+ option=${1}
+ shift
+
+ case "${option}" in
+ -c) IMPORT_ARGS+=("-c" "$1"); shift;;
+ -s) IMPORT_ARGS+=("-s" "$1"); shift;;
+ -t) IMPORT_ARGS+=("-t" "$1"); shift;;
+ -cx)
+ CRON_EXPR="$1"
+ shift
+ while [[ "$1" != "" && "$1" != -* ]]; do
+ CRON_EXPR="$CRON_EXPR $1"
+ shift
+ done
+ IMPORT_ARGS+=("-cx" "$CRON_EXPR");;
+ -h) export HELP_OPTION="true"; IMPORT_ARGS+=("-h");;
+ --catalog) IMPORT_ARGS+=("--catalog" "$1"); shift;;
+ --table) IMPORT_ARGS+=("--table" "$1"); shift;;
+ --schema) IMPORT_ARGS+=("--schema" "$1"); shift;;
+ --cronExpression)
+ CRON_EXPR="$1"
+ shift
+ while [[ "$1" != "" && "$1" != -* ]]; do
+ CRON_EXPR="$CRON_EXPR $1"
+ shift
+ done
+ IMPORT_ARGS+=("--cronExpression" "$CRON_EXPR");;
+ --help) export HELP_OPTION="true"; IMPORT_ARGS+=("--help");;
+ -*)
+ echo "Invalid argument found"
+ export HELP_OPTION="true"; IMPORT_ARGS+=("--help")
+ break;;
+ "") break;;
+ esac
+done
+
+JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}"
+
+if [ -z ${HELP_OPTION} ]; then
+ echo "Log file for import is $LOGFILE"
+fi
+
+"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}"
org.apache.atlas.trino.cli.TrinoExtractor "${IMPORT_ARGS[@]}"
+
+set +f
+
+RETVAL=$?
+if [ -z ${HELP_OPTION} ]; then
+ [ $RETVAL -eq 0 ] && echo Trino Meta Data imported successfully!
+ [ $RETVAL -eq 1 ] && echo Failed to import Trino Meta Data! Check logs at:
$LOGFILE for details.
+fi
+
+exit $RETVAL
diff --git a/addons/trino-extractor/src/main/conf/atlas-application.properties
b/addons/trino-extractor/src/main/conf/atlas-application.properties
new file mode 100755
index 000000000..d709ad016
--- /dev/null
+++ b/addons/trino-extractor/src/main/conf/atlas-application.properties
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######## Atlas connection ############
+atlas.rest.address=http://localhost:21000/
+
+######## Trino connection ############
+atlas.trino.jdbc.address=jdbc:trino://<host>:<port>/
+atlas.trino.jdbc.user=<username>
+
+######## Trino environment name ######
+atlas.trino.namespace=cm
+#atlas.trino.catalogs.registered=
+
+######## Datasource for which ########
+######## Atlas hook is enabled #######
+#atlas.trino.catalog.hook.enabled.hive_catalog=true
+#atlas.trino.catalog.hook.enabled.hive_catalog.namespace=cm
\ No newline at end of file
diff --git a/addons/trino-extractor/src/main/conf/atlas-log4j.xml
b/addons/trino-extractor/src/main/conf/atlas-log4j.xml
new file mode 100644
index 000000000..371c3b3e3
--- /dev/null
+++ b/addons/trino-extractor/src/main/conf/atlas-log4j.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+ <param name="File" value="${atlas.log.dir}/${atlas.log.file}"/>
+ <param name="Append" value="true"/>
+ <param name="maxFileSize" value="100MB" />
+ <param name="maxBackupIndex" value="20" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m
(%C{1}:%L)%n"/>
+ </layout>
+ </appender>
+
+ <logger name="org.apache.atlas.trino" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <root>
+ <priority value="warn"/>
+ <appender-ref ref="FILE"/>
+ </root>
+</log4j:configuration>
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java
new file mode 100644
index 000000000..d20c142b5
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.IOException;
+
+public class ExtractorContext {
+
+ static final String TRINO_NAMESPACE_CONF = "atlas.trino.namespace";
+ static final String DEFAULT_TRINO_NAMESPACE = "cm";
+ static final String OPTION_CATALOG_SHORT = "c";
+ static final String OPTION_CATALOG_LONG = "catalog";
+ static final String OPTION_SCHEMA_SHORT = "s";
+ static final String OPTION_SCHEMA_LONG = "schema";
+ static final String OPTION_TABLE_SHORT = "t";
+ static final String OPTION_TABLE_LONG = "table";
+ static final String OPTION_CRON_EXPRESSION_SHORT = "cx";
+ static final String OPTION_CRON_EXPRESSION_LONG = "cronExpression";
+ static final String OPTION_HELP_SHORT = "h";
+ static final String OPTION_HELP_LONG = "help";
+ private final Configuration atlasConf;
+ private String namespace;
+ private String catalog;
+ private String schema;
+ private String table;
+ private AtlasClientHelper atlasClientHelper;
+ private TrinoClientHelper trinoClientHelper;
+ private String cronExpression;
+
+ public ExtractorContext(CommandLine cmd) throws AtlasException,
IOException {
+ this.atlasConf = getAtlasProperties();
+ this.atlasClientHelper = createAtlasClientHelper();
+ this.trinoClientHelper = createTrinoClientHelper();
+ this.namespace = atlasConf.getString(TRINO_NAMESPACE_CONF,
DEFAULT_TRINO_NAMESPACE);
+ this.catalog = cmd.getOptionValue(OPTION_CATALOG_SHORT);
+ this.schema = cmd.getOptionValue(OPTION_SCHEMA_SHORT);
+ this.table = cmd.getOptionValue(OPTION_TABLE_SHORT);
+ this.cronExpression =
cmd.getOptionValue(OPTION_CRON_EXPRESSION_SHORT);
+ }
+
+ public Configuration getAtlasConf() {
+ return atlasConf;
+ }
+
+ public AtlasClientHelper getAtlasConnector() {
+ return atlasClientHelper;
+ }
+
+ public TrinoClientHelper getTrinoConnector() {
+ return trinoClientHelper;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ public String getCatalog() {
+ return catalog;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ public String getCronExpression() {
+ return cronExpression;
+ }
+
+ private Configuration getAtlasProperties() throws AtlasException {
+ return ApplicationProperties.get();
+ }
+
+ private TrinoClientHelper createTrinoClientHelper() {
+ return new TrinoClientHelper(atlasConf);
+ }
+
+ private AtlasClientHelper createAtlasClientHelper() throws IOException {
+ return new AtlasClientHelper(atlasConf);
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java
new file mode 100644
index 000000000..ca64c0d0c
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ExtractorService {
+ private static final Logger LOG =
LoggerFactory.getLogger(ExtractorService.class);
+
+ public static final int THREAD_POOL_SIZE = 5;
+ public static final int CATALOG_EXECUTION_TIMEOUT = 60;
+ public static final String TRINO_NAME_ATTRIBUTE = "name";
+ private static final String TRINO_CATALOG_REGISTERED =
"atlas.trino.catalogs.registered";
+ private static final String TRINO_CATALOG_HOOK_ENABLED_PREFIX =
"atlas.trino.catalog.hook.enabled.";
+ private static final String TRINO_CATALOG_HOOK_ENABLED_SUFFIX =
".namespace";
+ private static Configuration atlasProperties;
+ private static TrinoClientHelper trinoClientHelper;
+ private static AtlasClientHelper atlasClientHelper;
+ private static String trinoNamespace;
+ private ExtractorContext context;
+
+ public boolean execute(ExtractorContext context) throws Exception {
+ this.context = context;
+ atlasProperties = context.getAtlasConf();
+ trinoClientHelper = context.getTrinoConnector();
+ atlasClientHelper = context.getAtlasConnector();
+ trinoNamespace = context.getNamespace();
+
+ Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs();
+ LOG.info("Found {} catalogs in Trino", catalogs.toString());
+
+ try {
+ processCatalogs(context, catalogs);
+ deleteCatalogs(context, catalogs);
+ } catch (AtlasServiceException e) {
+ throw new AtlasServiceException(e);
+ }
+ return true;
+ }
+
+ public void processCatalogs(ExtractorContext context, Map<String, String>
catalogInTrino) throws AtlasServiceException {
+ if (MapUtils.isEmpty(catalogInTrino)) {
+ LOG.debug("No catalogs found under Trino");
+ return;
+ }
+
+ List<Catalog> catalogsToProcess = new ArrayList<>();
+
+ if (StringUtils.isEmpty(context.getCatalog())) {
+ String[] registeredCatalogs =
atlasProperties.getStringArray(TRINO_CATALOG_REGISTERED);
+
+ if (registeredCatalogs != null) {
+ for (String registeredCatalog : registeredCatalogs) {
+ if (catalogInTrino.containsKey(registeredCatalog)) {
+
catalogsToProcess.add(getCatalogInstance(registeredCatalog,
catalogInTrino.get(registeredCatalog)));
+ }
+ }
+ }
+ } else {
+ if (catalogInTrino.containsKey(context.getCatalog())) {
+ Catalog catalog = getCatalogInstance(context.getCatalog(),
catalogInTrino.get(context.getCatalog()));
+ catalog.setSchemaToImport(context.getSchema());
+ catalog.setTableToImport(context.getTable());
+ catalogsToProcess.add(catalog);
+ }
+ }
+
+ if (CollectionUtils.isEmpty(catalogsToProcess)) {
+ LOG.warn("No catalogs found to process");
+ return;
+ } else {
+ LOG.info("{} catalogs to be extracted",
catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList()));
+ }
+
+ AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity =
AtlasClientHelper.createOrUpdateInstanceEntity(trinoNamespace);
+
+ ExecutorService catalogExecutor =
Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(),
THREAD_POOL_SIZE));
+ List<Future<?>> futures = new ArrayList<>();
+
+ for (Catalog currentCatalog : catalogsToProcess) {
+ futures.add(catalogExecutor.submit(() -> {
+ try {
+ currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity);
+ processCatalog(currentCatalog);
+ } catch (Exception e) {
+ LOG.error("Error processing catalog: {}", currentCatalog,
e);
+ }
+ }));
+ }
+ catalogExecutor.shutdown();
+
+ try {
+ if (!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT,
TimeUnit.MINUTES)) {
+ LOG.warn("Catalog processing did not complete within the
timeout. {} minutes", CATALOG_EXECUTION_TIMEOUT);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Catalog processing was interrupted", e);
+ }
+
+ LOG.info("Catalogs scanned for creation/updation completed");
+ }
+
+ public void processCatalog(Catalog catalog) throws AtlasServiceException,
SQLException {
+ if (catalog != null) {
+ LOG.info("Started extracting {} catalog:", catalog.getName());
+ String catalogName = catalog.getName();
+
+ AtlasEntity.AtlasEntityWithExtInfo trinoCatalogEntity =
AtlasClientHelper.createOrUpdateCatalogEntity(catalog);
+
+ List<String> schemas =
trinoClientHelper.getTrinoSchemas(catalogName, catalog.getSchemaToImport());
+ LOG.info("Found {} schema under {} catalog", schemas.size(),
catalogName);
+
+ processSchemas(catalog, trinoCatalogEntity.getEntity(), schemas);
+
+ if (StringUtils.isEmpty(context.getSchema())) {
+ deleteSchemas(schemas,
trinoCatalogEntity.getEntity().getGuid());
+ }
+ }
+ }
+
+ public void processSchemas(Catalog catalog, AtlasEntity
trinoCatalogEntity, List<String> schemaToImport) {
+ for (String schemaName : schemaToImport) {
+ LOG.info("Started extracting {} schema:", schemaName);
+ try {
+ AtlasEntity.AtlasEntityWithExtInfo schemaEntity =
AtlasClientHelper.createOrUpdateSchemaEntity(catalog, trinoCatalogEntity,
schemaName);
+
+ List<String> tables =
trinoClientHelper.getTrinoTables(catalog.getName(), schemaName,
catalog.getTableToImport());
+ LOG.info("Found {} tables under {}.{} catalog.schema",
tables.size(), catalog.getName(), schemaName);
+
+ processTables(catalog, schemaName, schemaEntity.getEntity(),
tables);
+
+ if (StringUtils.isEmpty(context.getTable())) {
+ deleteTables(tables, schemaEntity.getEntity().getGuid());
+ }
+ } catch (Exception e) {
+ LOG.error("Error processing schema: {}", schemaName);
+ }
+ }
+ }
+
+ public void processTables(Catalog catalog, String schemaName, AtlasEntity
schemaEntity, List<String> trinoTables) {
+ for (String trinoTableName : trinoTables) {
+ LOG.info("Started extracting {} table:", trinoTableName);
+
+ try {
+ Map<String, Map<String, Object>> trinoColumns =
trinoClientHelper.getTrinoColumns(catalog.getName(), schemaName,
trinoTableName);
+ LOG.info("Found {} columns under {}.{}.{}
catalog.schema.table", trinoColumns.size(), catalog.getName(), schemaName,
trinoTableName);
+
+ AtlasClientHelper.createOrUpdateTableEntity(catalog,
schemaName, trinoTableName, trinoColumns, schemaEntity);
+ } catch (Exception e) {
+ LOG.error("Error processing table: {}", trinoTableName, e);
+ }
+ }
+ }
+
+ public void deleteCatalogs(ExtractorContext context, Map<String, String>
catalogInTrino) throws AtlasServiceException {
+ if (StringUtils.isNotEmpty(context.getCatalog())) {
+ return;
+ }
+
+ AtlasEntityHeader trinoInstance =
AtlasClientHelper.getTrinoInstance(trinoNamespace);
+ if (trinoInstance != null) {
+ Set<String> catalogsToDelete = getCatalogsToDelete(catalogInTrino,
trinoInstance.getGuid());
+
+ if (CollectionUtils.isNotEmpty(catalogsToDelete)) {
+ LOG.info("{} non existing catalogs to be deleted",
catalogsToDelete, trinoInstance.getGuid());
+
+ for (String catalogGuid : catalogsToDelete) {
+ try {
+ deleteSchemas(null, catalogGuid);
+
AtlasClientHelper.deleteByGuid(Collections.singleton(catalogGuid));
+ } catch (AtlasServiceException e) {
+ LOG.error("Error deleting catalog: {}", catalogGuid,
e);
+ }
+ }
+ } else {
+ LOG.info("No catalogs found to delete");
+ }
+ }
+
+ LOG.info("Catalogs scanned for deletion completed");
+ }
+
+ public void deleteSchemas(List<String> schemasInTrino, String catalogGuid)
{
+ try {
+ Set<String> schemasToDelete = getSchemasToDelete(schemasInTrino,
catalogGuid);
+
+ if (CollectionUtils.isNotEmpty(schemasToDelete)) {
+ LOG.info("{} non existing schemas under {} catalog found,
starting to delete", schemasToDelete, catalogGuid);
+
+ for (String schemaGuid : schemasToDelete) {
+ try {
+ deleteTables(null, schemaGuid);
+
AtlasClientHelper.deleteByGuid(Collections.singleton(schemaGuid));
+ } catch (AtlasServiceException e) {
+ LOG.error("Error in deleting schema: {}", schemaGuid,
e);
+ }
+ }
+ } else {
+ LOG.info("No schemas found under {} catalog to delete",
catalogGuid);
+ }
+ } catch (AtlasServiceException e) {
+ LOG.error("Error in deleting schemas ", catalogGuid, e);
+ }
+ }
+
+ private void deleteTables(List<String> tablesInTrino, String schemaGuid) {
+ try {
+ Set<String> tablesToDelete = getTablesToDelete(tablesInTrino,
schemaGuid);
+
+ if (CollectionUtils.isNotEmpty(tablesToDelete)) {
+ LOG.info("{} non existing tables under {} schema found,
starting to delete", tablesToDelete, schemaGuid);
+
+ for (String tableGuid : tablesToDelete) {
+ try {
+
AtlasClientHelper.deleteByGuid(Collections.singleton(tableGuid));
+ } catch (AtlasServiceException e) {
+ LOG.error("Error deleting table: {}", tableGuid, e);
+ }
+ }
+ } else {
+ LOG.info("No non existing tables found under {} schema",
schemaGuid);
+ }
+ } catch (AtlasServiceException e) {
+ LOG.error("Error deleting tables under schema: {}", schemaGuid, e);
+ }
+ }
+
+ private static Catalog getCatalogInstance(String catalogName, String
connectorType) {
+ if (catalogName == null) {
+ return null;
+ }
+
+ boolean isHookEnabled =
atlasProperties.getBoolean(TRINO_CATALOG_HOOK_ENABLED_PREFIX + catalogName,
false);
+ String hookNamespace = null;
+ final String HOOK_NAMESPACE_PROPERTY =
TRINO_CATALOG_HOOK_ENABLED_PREFIX + catalogName +
TRINO_CATALOG_HOOK_ENABLED_SUFFIX;
+ if (isHookEnabled) {
+ hookNamespace = atlasProperties.getString(HOOK_NAMESPACE_PROPERTY);
+
+ if (hookNamespace == null) {
+ LOG.warn("Atlas Hook is enabled for {}, but '{}' found empty",
catalogName, HOOK_NAMESPACE_PROPERTY);
+ }
+ }
+
+ Catalog catalog = new Catalog(catalogName, connectorType,
isHookEnabled, hookNamespace, trinoNamespace);
+ return catalog;
+ }
+
+ private static Set<String> getCatalogsToDelete(Map<String, String>
catalogInTrino, String instanceGuid) throws AtlasServiceException {
+
+ if (instanceGuid != null) {
+
+ List<AtlasEntityHeader> catalogsInAtlas =
AtlasClientHelper.getAllCatalogsInInstance(instanceGuid);
+ if (catalogsInAtlas != null) {
+
+ if (catalogInTrino == null) {
+ catalogInTrino = new HashMap<>();
+ }
+
+ Map<String, String> finalCatalogInTrino = catalogInTrino;
+ return catalogsInAtlas.stream()
+ .filter(entity ->
entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null)
+ .filter(entity ->
!finalCatalogInTrino.containsKey(entity.getAttribute(TRINO_NAME_ATTRIBUTE)))
+ .map(AtlasEntityHeader::getGuid)
+ .collect(Collectors.toSet());
+ }
+ }
+
+ return new HashSet<>();
+ }
+
+ private static Set<String> getSchemasToDelete(List<String> schemasInTrino,
String catalogGuid) throws AtlasServiceException {
+
+ if (catalogGuid != null) {
+
+ List<AtlasEntityHeader> schemasInAtlas =
AtlasClientHelper.getAllSchemasInCatalog(catalogGuid);
+ if (schemasInAtlas != null) {
+
+ if (schemasInTrino == null) {
+ schemasInTrino = new ArrayList<>();
+ }
+
+ List<String> finalSchemasInTrino = schemasInTrino;
+ return schemasInAtlas.stream()
+ .filter(entity ->
entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null)
+ .filter(entity ->
!finalSchemasInTrino.contains(entity.getAttribute(TRINO_NAME_ATTRIBUTE)))
+ .map(AtlasEntityHeader::getGuid)
+ .collect(Collectors.toSet());
+ }
+ }
+
+ return new HashSet<>();
+ }
+
+ private static Set<String> getTablesToDelete(List<String> tablesInTrino,
String schemaGuid) throws AtlasServiceException {
+
+ if (schemaGuid != null) {
+
+ List<AtlasEntityHeader> tablesInAtlas =
AtlasClientHelper.getAllTablesInSchema(schemaGuid);
+ if (tablesInAtlas != null) {
+
+ if (tablesInTrino == null) {
+ tablesInTrino = new ArrayList<>();
+ }
+
+ List<String> finalTablesInTrino = tablesInTrino;
+ return tablesInAtlas.stream()
+ .filter(entity ->
entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null)
+ .filter(entity ->
!finalTablesInTrino.contains(entity.getAttribute(TRINO_NAME_ATTRIBUTE)))
+ .map(AtlasEntityHeader::getGuid)
+ .collect(Collectors.toSet());
+ }
+ }
+
+ return new HashSet<>();
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java
new file mode 100644
index 000000000..4a0ebe085
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.quartz.CronExpression;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerFactory;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CATALOG_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CATALOG_SHORT;
+import static
org.apache.atlas.trino.cli.ExtractorContext.OPTION_CRON_EXPRESSION_LONG;
+import static
org.apache.atlas.trino.cli.ExtractorContext.OPTION_CRON_EXPRESSION_SHORT;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_HELP_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_HELP_SHORT;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_SCHEMA_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_SCHEMA_SHORT;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_TABLE_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_TABLE_SHORT;
+
+public class TrinoExtractor {
+ private static final Logger LOG =
LoggerFactory.getLogger(TrinoExtractor.class);
+
+ private static final int EXIT_CODE_SUCCESS = 0;
+ private static final int EXIT_CODE_FAILED = 1;
+ private static final int EXIT_CODE_HELP = 2;
+ private static int exitCode = EXIT_CODE_FAILED;
+
+ private static ExtractorContext extractorContext;
+
+ public static void main(String[] args) {
+ try {
+ extractorContext = createExtractorContext(args);
+ if (extractorContext != null) {
+ String cronExpression = extractorContext.getCronExpression();
+
+ if (StringUtils.isNotEmpty(cronExpression)) {
+
+ if (!CronExpression.isValidExpression(cronExpression)) {
+ LOG.error("Invalid cron expression provided: {}",
cronExpression);
+ } else {
+ LOG.info("Cron Expression found, scheduling the job
for {}", cronExpression);
+ SchedulerFactory sf = new StdSchedulerFactory();
+ Scheduler scheduler = sf.getScheduler();
+
+ JobDetail job =
JobBuilder.newJob(MetadataJob.class).withIdentity("metadataJob",
"group1").build();
+ Trigger trigger = TriggerBuilder.newTrigger()
+
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow()
+ .build();
+
+ scheduler.scheduleJob(job, trigger);
+ scheduler.start();
+ Thread.currentThread().join();
+ }
+ } else {
+ LOG.warn("Cron Expression missing, hence will run the job
once");
+ ExtractorService extractorService = new ExtractorService();
+ if (extractorService.execute(extractorContext)) {
+ exitCode = EXIT_CODE_SUCCESS;
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error encountered.", e);
+ System.out.println("exitCode : " + exitCode);
+ System.out.println("Error encountered." + e);
+ } finally {
+ if (extractorContext != null &&
extractorContext.getAtlasConnector() != null) {
+ extractorContext.getAtlasConnector().close();
+ }
+ }
+
+ System.exit(exitCode);
+ }
+
+ static ExtractorContext createExtractorContext(String[] args) throws
AtlasBaseException, IOException {
+ Options acceptedCliOptions = prepareCommandLineOptions();
+
+ try {
+ CommandLine cmd = new BasicParser().parse(acceptedCliOptions,
args, true);
+ List<String> argsNotProcessed = cmd.getArgList();
+
+ if (argsNotProcessed != null && argsNotProcessed.size() > 0) {
+ throw new AtlasBaseException("Unrecognized arguments.");
+ }
+
+ ExtractorContext ret = null;
+ if (cmd.hasOption(ExtractorContext.OPTION_HELP_SHORT)) {
+ printUsage(acceptedCliOptions);
+ exitCode = EXIT_CODE_HELP;
+ } else {
+ ret = new ExtractorContext(cmd);
+ LOG.debug("Successfully initialized the extractor context.");
+ }
+
+ return ret;
+ } catch (ParseException | AtlasBaseException e) {
+ printUsage(acceptedCliOptions);
+
+ throw new AtlasBaseException("Invalid arguments. Reason: " +
e.getMessage(), e);
+ } catch (AtlasException e) {
+ throw new AtlasBaseException("Error in getting Application
Properties. Reason: " + e.getMessage(), e);
+ } catch (IOException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static Options prepareCommandLineOptions() {
+ Options acceptedCliOptions = new Options();
+
+ return acceptedCliOptions.addOption(OPTION_CATALOG_SHORT,
OPTION_CATALOG_LONG, true, "Catalog name")
+ .addOption(OPTION_SCHEMA_SHORT, OPTION_SCHEMA_LONG, true,
"Schema name")
+ .addOption(OPTION_TABLE_SHORT, OPTION_TABLE_LONG, true, "Table
name")
+ .addOption(OPTION_CRON_EXPRESSION_SHORT,
OPTION_CRON_EXPRESSION_LONG, true, "Cron expression to run extraction")
+ .addOption(OPTION_HELP_SHORT, OPTION_HELP_LONG, false, "Print
help message");
+ }
+
+ private static void printUsage(Options options) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp(TrinoExtractor.class.getName(), options);
+ }
+
+ @DisallowConcurrentExecution
+ public static class MetadataJob implements Job {
+ private final Logger LOG = LoggerFactory.getLogger(MetadataJob.class);
+
+ public void execute(JobExecutionContext context) throws
JobExecutionException {
+ if (extractorContext != null) {
+ LOG.info("Executing metadata extraction at: {}",
java.time.LocalTime.now());
+ ExtractorService extractorService = new ExtractorService();
+
+ try {
+ if (extractorService.execute(extractorContext)) {
+ exitCode = EXIT_CODE_SUCCESS;
+ }
+ } catch (Exception e) {
+ LOG.error("Error encountered: ", e);
+ throw new JobExecutionException(e);
+ }
+ LOG.info("Completed executing metadata extraction at: {}",
java.time.LocalTime.now());
+ }
+ }
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java
new file mode 100644
index 000000000..5ff2ff2a9
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.client;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.atlas.type.AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME;
+
+public class AtlasClientHelper {
+ private static final Logger LOG =
LoggerFactory.getLogger(AtlasClientHelper.class);
+
+ public static final String TRINO_INSTANCE =
"trino_instance";
+ public static final String TRINO_CATALOG =
"trino_catalog";
+ public static final String TRINO_SCHEMA =
"trino_schema";
+ public static final String TRINO_TABLE =
"trino_table";
+ public static final String TRINO_COLUMN =
"trino_column";
+ public static final String TRINO_INSTANCE_CATALOG_ATTRIBUTE = "catalogs";
+ public static final String TRINO_CATALOG_SCHEMA_ATTRIBUTE = "schemas";
+ public static final String TRINO_SCHEMA_TABLE_ATTRIBUTE = "tables";
+ public static final String QUALIFIED_NAME_ATTRIBUTE =
"qualifiedName";
+ public static final String NAME_ATTRIBUTE = "name";
+ public static final int pageLimit =
10000;
+ private static final String DEFAULT_ATLAS_URL =
"http://localhost:21000/";
+ private static final String APPLICATION_PROPERTY_ATLAS_ENDPOINT =
"atlas.rest.address";
+ private static final String TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE =
"connectorType";
+ private static final String TRINO_CATALOG_INSTANCE_ATTRIBUTE =
"instance";
+ private static final String TRINO_CATALOG_INSTANCE_RELATIONSHIP =
"trino_instance_catalog";
+ private static final String TRINO_SCHEMA_CATALOG_ATTRIBUTE =
"catalog";
+ private static final String TRINO_SCHEMA_CATALOG_RELATIONSHIP =
"trino_schema_catalog";
+ private static final String TRINO_COLUMN_DATA_TYPE_ATTRIBUTE =
"data_type";
+ private static final String TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE =
"ordinal_position";
+ private static final String TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE =
"column_default";
+ private static final String TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE =
"is_nullable";
+ private static final String TRINO_COLUMN_TABLE_ATTRIBUTE =
"table";
+ private static final String TRINO_TABLE_COLUMN_RELATIONSHIP =
"trino_table_columns";
+ private static final String TRINO_TABLE_SCHEMA_RELATIONSHIP =
"trino_table_schema";
+ private static final String TRINO_TABLE_SCHEMA_ATTRIBUTE =
"trinoschema";
+ private static final String TRINO_TABLE_COLUMN_ATTRIBUTE =
"columns";
+
+ private static AtlasClientV2 atlasClientV2;
+
+ public AtlasClientHelper(Configuration atlasConf) throws IOException {
+ atlasClientV2 = getAtlasClientV2Instance(atlasConf);
+ }
+
+ public static synchronized AtlasClientV2
getAtlasClientV2Instance(Configuration atlasConf) throws IOException {
+ if (atlasClientV2 == null) {
+ String[] atlasEndpoint = new String[] {DEFAULT_ATLAS_URL};
+
+ if (atlasConf != null &&
ArrayUtils.isNotEmpty(atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT)))
{
+ atlasEndpoint =
atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT);
+ }
+
+ if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+ String[] basicAuthUsernamePassword =
AuthenticationUtil.getBasicAuthenticationInput();
+ atlasClientV2 = new AtlasClientV2(atlasEndpoint,
basicAuthUsernamePassword);
+ } else {
+ UserGroupInformation ugi =
UserGroupInformation.getCurrentUser();
+ atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(),
atlasEndpoint);
+ }
+ }
+ return atlasClientV2;
+ }
+
+ public static List<AtlasEntityHeader> getAllCatalogsInInstance(String
instanceGuid) throws AtlasServiceException {
+
+ List<AtlasEntityHeader> entities =
getAllRelationshipEntities(instanceGuid, TRINO_INSTANCE_CATALOG_ATTRIBUTE);
+ if (CollectionUtils.isNotEmpty(entities)) {
+ LOG.debug("Retrieved {} catalogs of {} trino instance",
entities.size(), instanceGuid);
+ return entities;
+ } else {
+ LOG.debug("No catalog found under {} trino instance",
instanceGuid);
+ return null;
+ }
+ }
+
+ public static List<AtlasEntityHeader> getAllSchemasInCatalog(String
catalogGuid) throws AtlasServiceException {
+
+ List<AtlasEntityHeader> entities =
getAllRelationshipEntities(catalogGuid, TRINO_CATALOG_SCHEMA_ATTRIBUTE);
+ if (CollectionUtils.isNotEmpty(entities)) {
+ LOG.debug("Retrieved {} schemas of {} trino catalog",
entities.size(), catalogGuid);
+ return entities;
+ } else {
+ LOG.debug("No schema found under {} trino catalog", catalogGuid);
+ return null;
+ }
+ }
+
+ public static List<AtlasEntityHeader> getAllTablesInSchema(String
schemaGuid) throws AtlasServiceException {
+
+ List<AtlasEntityHeader> entities =
getAllRelationshipEntities(schemaGuid, TRINO_SCHEMA_TABLE_ATTRIBUTE);
+ if (CollectionUtils.isNotEmpty(entities)) {
+ LOG.debug("Retrieved {} tables of {} trino schema",
entities.size(), schemaGuid);
+ return entities;
+ } else {
+ LOG.debug("No table found under {} trino schema", schemaGuid);
+ return null;
+ }
+ }
+
+ public static List<AtlasEntityHeader> getAllRelationshipEntities(String
entityGuid, String relationshipAttributeName) throws AtlasServiceException {
+
+ if (entityGuid == null) {
+ return null;
+ }
+ List<AtlasEntityHeader> entities = new ArrayList<>();
+ final int pageSize = pageLimit;
+
+ for (int i = 0; ; i++) {
+ int offset = pageSize * i;
+ LOG.debug("Retrieving: offset={}, pageSize={}", offset, pageSize);
+
+ AtlasSearchResult searchResult =
atlasClientV2.relationshipSearch(entityGuid, relationshipAttributeName, null,
null, true, pageSize, offset);
+
+ List<AtlasEntityHeader> entityHeaders = searchResult == null ?
null : searchResult.getEntities();
+ int count = entityHeaders == null ? 0
: entityHeaders.size();
+
+ if (count > 0) {
+ entities.addAll(entityHeaders);
+ }
+
+ if (count < pageSize) { // last page
+ break;
+ }
+ }
+
+ return entities;
+ }
+
+ public static AtlasEntityHeader getTrinoInstance(String namespace) {
+ try {
+ return atlasClientV2.getEntityHeaderByAttribute(TRINO_INSTANCE,
Collections.singletonMap(QUALIFIED_NAME_ATTRIBUTE, namespace));
+ } catch (AtlasServiceException e) {
+ return null;
+ }
+ }
+
+ public static AtlasEntity.AtlasEntityWithExtInfo
createOrUpdateInstanceEntity(String trinoNamespace) throws
AtlasServiceException {
+ String qualifiedName = trinoNamespace;
+ AtlasEntity.AtlasEntityWithExtInfo ret =
findEntity(TRINO_INSTANCE, qualifiedName, true, true);
+
+ if (ret == null) {
+ ret = new AtlasEntity.AtlasEntityWithExtInfo();
+ AtlasEntity entity = new AtlasEntity(TRINO_INSTANCE);
+
+ entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+ entity.setAttribute(NAME_ATTRIBUTE, trinoNamespace);
+
+ ret.setEntity(entity);
+ ret = createEntity(ret);
+ }
+ return ret;
+ }
+
+ public static AtlasEntity.AtlasEntityWithExtInfo
createOrUpdateCatalogEntity(Catalog catalog) throws AtlasServiceException {
+ String catalogName = catalog.getName();
+ String trinoNamespace = catalog.getInstanceName();
+ String qualifiedName = String.format("%s@%s", catalogName,
trinoNamespace);
+
+ AtlasEntity.AtlasEntityWithExtInfo ret = findEntity(TRINO_CATALOG,
qualifiedName, true, true);
+ if (ret == null) {
+ ret = new AtlasEntity.AtlasEntityWithExtInfo();
+ AtlasEntity entity = new AtlasEntity(TRINO_CATALOG);
+
+ entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+ entity.setAttribute(NAME_ATTRIBUTE, catalogName);
+ entity.setAttribute(TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE,
catalog.getType());
+ entity.setRelationshipAttribute(TRINO_CATALOG_INSTANCE_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(catalog.getTrinoInstanceEntity().getEntity(),
TRINO_CATALOG_INSTANCE_RELATIONSHIP));
+
+ if (catalog.getConnector() != null) {
+
catalog.getConnector().connectTrinoCatalog(catalog.getHookInstanceName(),
catalogName, entity, ret);
+ }
+ ret.setEntity(entity);
+ ret = createEntity(ret);
+ } else {
+ AtlasEntity entity = ret.getEntity();
+ entity.setRelationshipAttribute(TRINO_CATALOG_INSTANCE_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(catalog.getTrinoInstanceEntity().getEntity(),
TRINO_CATALOG_INSTANCE_RELATIONSHIP));
+
+ if (catalog.getConnector() != null) {
+
catalog.getConnector().connectTrinoCatalog(catalog.getHookInstanceName(),
catalogName, entity, ret);
+ }
+ ret.setEntity(entity);
+ updateEntity(ret);
+ }
+
+ return ret;
+ }
+
+ public static AtlasEntity.AtlasEntityWithExtInfo
createOrUpdateSchemaEntity(Catalog catalog, AtlasEntity catalogEntity, String
schema) throws AtlasServiceException {
+ String qualifiedName = String.format("%s.%s@%s", catalog.getName(),
schema, catalog.getInstanceName());
+
+ AtlasEntity.AtlasEntityWithExtInfo ret = findEntity(TRINO_SCHEMA,
qualifiedName, true, true);
+
+ if (ret == null) {
+ ret = new AtlasEntity.AtlasEntityWithExtInfo();
+ AtlasEntity entity = new AtlasEntity(TRINO_SCHEMA);
+
+ entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+ entity.setAttribute(NAME_ATTRIBUTE, schema);
+ entity.setRelationshipAttribute(TRINO_SCHEMA_CATALOG_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(catalogEntity,
TRINO_SCHEMA_CATALOG_RELATIONSHIP));
+
+ if (catalog.getConnector() != null) {
+
catalog.getConnector().connectTrinoSchema(catalog.getHookInstanceName(),
catalog.getName(), schema, entity, ret);
+ }
+
+ ret.setEntity(entity);
+ ret = createEntity(ret);
+ } else {
+ AtlasEntity entity = ret.getEntity();
+ entity.setRelationshipAttribute(TRINO_SCHEMA_CATALOG_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(catalogEntity,
TRINO_SCHEMA_CATALOG_RELATIONSHIP));
+
+ if (catalog.getConnector() != null) {
+
catalog.getConnector().connectTrinoSchema(catalog.getHookInstanceName(),
catalog.getName(), schema, entity, ret);
+ }
+ ret.setEntity(entity);
+ updateEntity(ret);
+ }
+
+ return ret;
+ }
+
+ public static AtlasEntity.AtlasEntityWithExtInfo
createOrUpdateTableEntity(Catalog catalog, String schema, String table,
Map<String, Map<String, Object>> trinoColumns, AtlasEntity schemaEntity) throws
AtlasServiceException {
+ String qualifiedName = String.format("%s.%s.%s@%s", catalog.getName(),
schema, table, catalog.getInstanceName());
+
+ AtlasEntity.AtlasEntityWithExtInfo ret;
+ AtlasEntity.AtlasEntityWithExtInfo tableEntityExt =
findEntity(TRINO_TABLE, qualifiedName, true, true);
+
+ if (tableEntityExt == null) {
+ tableEntityExt = toTableEntity(catalog, schema, table,
trinoColumns, schemaEntity, tableEntityExt);
+ ret = createEntity(tableEntityExt);
+ } else {
+ ret = toTableEntity(catalog, schema, table, trinoColumns,
schemaEntity, tableEntityExt);
+ updateEntity(ret);
+ }
+
+ return ret;
+ }
+
+ public static AtlasEntity.AtlasEntityWithExtInfo toTableEntity(Catalog
catalog, String schema, String table, Map<String, Map<String, Object>>
trinoColumns, AtlasEntity schemaEntity, AtlasEntity.AtlasEntityWithExtInfo
tableEntityExt) {
+ if (tableEntityExt == null) {
+ tableEntityExt = new AtlasEntity.AtlasEntityWithExtInfo(new
AtlasEntity(TRINO_TABLE));
+ }
+
+ String qualifiedName = String.format("%s.%s.%s@%s", catalog.getName(),
schema, table, catalog.getInstanceName());
+
+ AtlasEntity tableEntity = tableEntityExt.getEntity();
+ tableEntity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+ tableEntity.setAttribute(NAME_ATTRIBUTE, table);
+
+ List<AtlasEntity> columnEntities = new ArrayList<>();
+ for (Map.Entry<String, Map<String, Object>> columnEntry :
trinoColumns.entrySet()) {
+ AtlasEntity entity = new AtlasEntity(TRINO_COLUMN);
+
+ String columnName = columnEntry.getKey();
+ String colQualifiedName = String.format("%s.%s.%s.%s@%s",
catalog.getName(), schema, table, columnName, catalog.getInstanceName());
+
+ entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, colQualifiedName);
+ entity.setAttribute(NAME_ATTRIBUTE, columnName);
+ if (MapUtils.isNotEmpty(columnEntry.getValue())) {
+ Map<String, Object> columnAttr = columnEntry.getValue();
+ entity.setAttribute(TRINO_COLUMN_DATA_TYPE_ATTRIBUTE,
columnAttr.get(TRINO_COLUMN_DATA_TYPE_ATTRIBUTE));
+ entity.setAttribute(TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE,
columnAttr.get(TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE));
+ entity.setAttribute(TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE,
columnAttr.get(TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE));
+ entity.setAttribute(TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE,
columnAttr.get(TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE));
+ }
+
+ entity.setRelationshipAttribute(TRINO_COLUMN_TABLE_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(tableEntity,
TRINO_TABLE_COLUMN_RELATIONSHIP));
+ columnEntities.add(entity);
+ }
+
+ tableEntity.setRelationshipAttribute(TRINO_TABLE_SCHEMA_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(schemaEntity,
TRINO_TABLE_SCHEMA_RELATIONSHIP));
+ tableEntity.setRelationshipAttribute(TRINO_TABLE_COLUMN_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectIds(columnEntities,
TRINO_TABLE_COLUMN_RELATIONSHIP));
+
+ if (catalog.getConnector() != null) {
+
catalog.getConnector().connectTrinoTable(catalog.getHookInstanceName(),
catalog.getName(), schema, table, tableEntity, columnEntities, tableEntityExt);
+ }
+
+ tableEntityExt.addReferredEntity(schemaEntity);
+ if (columnEntities != null) {
+ for (AtlasEntity column : columnEntities) {
+ tableEntityExt.addReferredEntity(column);
+ }
+ }
+
+ tableEntityExt.setEntity(tableEntity);
+ return tableEntityExt;
+ }
+
+ public static AtlasEntity.AtlasEntityWithExtInfo findEntity(final String
typeName, final String qualifiedName, boolean minExtInfo, boolean
ignoreRelationship) throws AtlasServiceException {
+ AtlasEntity.AtlasEntityWithExtInfo ret = null;
+
+ try {
+ ret = atlasClientV2.getEntityByAttribute(typeName,
Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName), minExtInfo,
ignoreRelationship);
+ } catch (AtlasServiceException e) {
+ if (e.getStatus() == ClientResponse.Status.NOT_FOUND) {
+ return null;
+ }
+
+ throw e;
+ }
+ return ret;
+ }
+
+ public static AtlasEntity.AtlasEntityWithExtInfo
createEntity(AtlasEntity.AtlasEntityWithExtInfo entity) throws
AtlasServiceException {
+ LOG.debug("creating {} entity: {}", entity.getEntity().getTypeName(),
entity);
+
+ AtlasEntity.AtlasEntityWithExtInfo ret = null;
+ EntityMutationResponse response =
atlasClientV2.createEntity(entity);
+ List<AtlasEntityHeader> createdEntities =
response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
+
+ if (CollectionUtils.isNotEmpty(createdEntities)) {
+ for (AtlasEntityHeader createdEntity : createdEntities) {
+ if (ret == null) {
+ ret =
atlasClientV2.getEntityByGuid(createdEntity.getGuid());
+
+ LOG.debug("Created {} entity: name={}, guid={}",
ret.getEntity().getTypeName(),
ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME),
ret.getEntity().getGuid());
+ } else if (ret.getEntity(createdEntity.getGuid()) == null) {
+ AtlasEntity.AtlasEntityWithExtInfo newEntity =
atlasClientV2.getEntityByGuid(createdEntity.getGuid());
+
+ ret.addReferredEntity(newEntity.getEntity());
+
+ if (MapUtils.isNotEmpty(newEntity.getReferredEntities())) {
+ for (Map.Entry<String, AtlasEntity> entry :
newEntity.getReferredEntities().entrySet()) {
+ ret.addReferredEntity(entry.getKey(),
entry.getValue());
+ }
+ }
+
+ LOG.debug("Created {} entity: name={}, guid={}",
newEntity.getEntity().getTypeName(),
newEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME),
newEntity.getEntity().getGuid());
+ }
+ }
+ }
+
+ clearRelationshipAttributes(ret);
+
+ return ret;
+ }
+
+ public static void deleteByGuid(Set<String> guidTodelete) throws
AtlasServiceException {
+
+ if (CollectionUtils.isNotEmpty(guidTodelete)) {
+
+ for (String guid : guidTodelete) {
+ EntityMutationResponse response =
atlasClientV2.deleteEntityByGuid(guid);
+
+ if (response == null || response.getDeletedEntities().size() <
1) {
+ LOG.debug("Entity with guid : {} is not deleted", guid);
+ } else {
+ LOG.debug("Entity with guid : {} is deleted", guid);
+ }
+ }
+ }
+ }
+
+ public static void close() {
+ atlasClientV2.close();
+ }
+
+
+
+ private static void updateEntity(AtlasEntity.AtlasEntityWithExtInfo
entity) throws AtlasServiceException {
+ LOG.debug("updating {} entity: {}", entity.getEntity().getTypeName(),
entity);
+
+ atlasClientV2.updateEntity(entity);
+
+ LOG.debug("Updated {} entity: name={}, guid={}",
entity.getEntity().getTypeName(),
entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME),
entity.getEntity().getGuid());
+ }
+
+ private static void
clearRelationshipAttributes(AtlasEntity.AtlasEntityWithExtInfo entity) {
+ if (entity != null) {
+ clearRelationshipAttributes(entity.getEntity());
+
+ if (entity.getReferredEntities() != null) {
+
clearRelationshipAttributes(entity.getReferredEntities().values());
+ }
+ }
+ }
+
+ private static void clearRelationshipAttributes(Collection<AtlasEntity>
entities) {
+ if (entities != null) {
+ for (AtlasEntity entity : entities) {
+ clearRelationshipAttributes(entity);
+ }
+ }
+ }
+
+ private static void clearRelationshipAttributes(AtlasEntity entity) {
+ if (entity != null && entity.getRelationshipAttributes() != null) {
+ entity.getRelationshipAttributes().clear();
+ }
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java
new file mode 100644
index 000000000..ef24cb6e4
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.client;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TrinoClientHelper {
+
+ private static String jdbcUrl;
+ private static String username;
+ private static String password;
+
+ public TrinoClientHelper(Configuration atlasConf) {
+ this.jdbcUrl = atlasConf.getString("atlas.trino.jdbc.address");
+ this.username = atlasConf.getString("atlas.trino.jdbc.user");
+ this.password = atlasConf.getString("atlas.trino.jdbc.password", "");
+ }
+
+ public static Connection getTrinoConnection() throws SQLException {
+ return DriverManager.getConnection(jdbcUrl, username, password);
+ }
+
+ public Map<String, String> getAllTrinoCatalogs() {
+ Map<String, String> catalogs = new HashMap<>();
+ try {
+ Connection connection = getTrinoConnection();
+ Statement stmt = connection.createStatement();
+ StringBuilder query = new StringBuilder();
+ query.append("SELECT catalog_name, connector_name FROM
system.metadata.catalogs");
+
+ ResultSet rs = stmt.executeQuery(query.toString());
+ while (rs.next()) {
+ catalogs.put(rs.getString("catalog_name"),
rs.getString("connector_name"));
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+
+ return catalogs;
+ }
+
+ public List<String> getTrinoSchemas(String catalog, String schemaToImport)
throws SQLException {
+ List<String> schemas = new ArrayList<>();
+ Connection connection = getTrinoConnection();
+ Statement stmt = connection.createStatement();
+ StringBuilder query = new StringBuilder();
+ query.append("SELECT schema_name FROM " + catalog +
".information_schema.schemata");
+
+ if (StringUtils.isNotEmpty(schemaToImport)) {
+ query.append(" where schema_name = '" + schemaToImport + "'");
+ }
+
+ ResultSet rs = stmt.executeQuery(query.toString());
+ while (rs.next()) {
+ schemas.add(rs.getString("schema_name"));
+ }
+
+ return schemas;
+ }
+
+ public List<String> getTrinoTables(String catalog, String schema, String
tableToImport) throws SQLException {
+ List<String> tables = new ArrayList<>();
+ Connection connection = getTrinoConnection();
+ Statement stmt = connection.createStatement();
+ StringBuilder query = new StringBuilder();
+ query.append("SELECT table_name FROM " + catalog +
".information_schema.tables WHERE table_schema = '" + schema + "'");
+ if (StringUtils.isNotEmpty(tableToImport)) {
+ query.append(" and table_name = '" + tableToImport + "'");
+ }
+
+ ResultSet rs = stmt.executeQuery(query.toString());
+ while (rs.next()) {
+ tables.add(rs.getString("table_name"));
+ }
+
+ return tables;
+ }
+
+ public Map<String, Map<String, Object>> getTrinoColumns(String catalog,
String schema, String table) throws SQLException {
+ Map<String, Map<String, Object>> columns = new HashMap<>();
+ Connection connection = getTrinoConnection();
+ Statement stmt =
connection.createStatement();
+ StringBuilder query = new StringBuilder();
+ query.append("SELECT column_name, ordinal_position, column_default,
is_nullable, data_type FROM " + catalog + ".information_schema.columns WHERE
table_schema = '" + schema + "' AND table_name = '" + table + "'");
+
+ ResultSet rs = stmt.executeQuery(query.toString());
+ while (rs.next()) {
+ Map<String, Object> columnMetadata = new HashMap<>();
+ columnMetadata.put("ordinal_position",
rs.getInt("ordinal_position"));
+ columnMetadata.put("column_default",
rs.getString("column_default"));
+ columnMetadata.put("column_name", rs.getString("column_name"));
+ if (StringUtils.isNotEmpty(rs.getString("is_nullable"))) {
+ if (StringUtils.equalsIgnoreCase(rs.getString("is_nullable"),
"YES")) {
+ columnMetadata.put("is_nullable", true);
+ } else {
+ columnMetadata.put("is_nullable", false);
+ }
+ }
+ columnMetadata.put("data_type", rs.getString("data_type"));
+
+ columns.put(rs.getString("column_name"), columnMetadata);
+ }
+
+ return columns;
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java
new file mode 100644
index 000000000..33e716ee3
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+
+import java.util.List;
+
+public abstract class AtlasEntityConnector {
+
+ public abstract void connectTrinoCatalog(String instanceName, String
catalogName, AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo
entityWithExtInfo);
+
+ public abstract void connectTrinoSchema(String instanceName, String
catalogName, String schemaName, AtlasEntity entity,
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo);
+
+ public abstract void connectTrinoTable(String instanceName, String
catalogName, String schemaName, String tableName, AtlasEntity entity,
List<AtlasEntity> columnEntities, AtlasEntity.AtlasEntityWithExtInfo
entityWithExtInfo);
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java
new file mode 100644
index 000000000..4fa4d2b90
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectorFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConnectorFactory.class);
+
+ public static AtlasEntityConnector getConnector(String connectorType) {
+ switch (connectorType.toLowerCase()) {
+ case "mysql":
+ return new RdbmsEntityConnector();
+ case "hive":
+ return new HiveEntityConnector();
+ default:
+ LOG.warn("{} type does not have hook implemented on Atlas");
+ return null;
+ }
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java
new file mode 100644
index 000000000..3a272435d
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class HiveEntityConnector extends AtlasEntityConnector {
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveEntityConnector.class);
+
+ public static final String HIVE_INSTANCE =
"hms_instance";
+ public static final String HIVE_DB =
"hive_db";
+ public static final String HIVE_TABLE =
"hive_table";
+ public static final String HIVE_COLUMN =
"hive_column";
+ public static final String TRINO_SCHEMA_HIVE_DB_RELATIONSHIP =
"trino_schema_hive_db";
+ public static final String TRINO_TABLE_HIVE_TABLE_RELATIONSHIP =
"trino_table_hive_table";
+ public static final String TRINO_COLUMN_HIVE_COLUMN_RELATIONSHIP =
"trino_column_hive_column";
+ public static final String TRINO_SCHEMA_HIVE_DB_ATTRIBUTE =
"hive_db";
+ public static final String TRINO_TABLE_HIVE_TABLE_ATTRIBUTE =
"hive_table";
+ public static final String TRINO_COLUMN_HIVE_COLUMN_ATTRIBUTE =
"hive_column";
+ @Override
+ public void connectTrinoCatalog(String instanceName, String catalogName,
AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+
+ }
+
+ @Override
+ public void connectTrinoSchema(String instanceName, String catalogName,
String schemaName, AtlasEntity dbEntity, AtlasEntity.AtlasEntityWithExtInfo
entityWithExtInfo) {
+ if (instanceName == null) {
+ LOG.warn("Failed attempting to connect entity since hook namespace
is empty, Please configure in properties");
+ return;
+ }
+
+ AtlasEntity hiveDb = null;
+ try {
+ hiveDb = toDbEntity(instanceName, schemaName);
+ } catch (AtlasServiceException e) {
+ LOG.error("Error encountered: ", e);
+ }
+
+ if (hiveDb != null) {
+ dbEntity.setRelationshipAttribute(TRINO_SCHEMA_HIVE_DB_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(hiveDb,
TRINO_SCHEMA_HIVE_DB_RELATIONSHIP));
+ }
+ }
+
+ @Override
+ public void connectTrinoTable(String instanceName, String catalogName,
String schemaName, String tableName, AtlasEntity trinoTable, List<AtlasEntity>
columnEntities, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+ if (instanceName == null) {
+ LOG.warn("Failed attempting to entity since hook namespace is
empty, Please configure in properties");
+ return;
+ }
+
+ AtlasEntity hiveTable;
+ try {
+ hiveTable = toTableEntity(instanceName, schemaName, tableName);
+
+ if (hiveTable != null) {
+
trinoTable.setRelationshipAttribute(TRINO_TABLE_HIVE_TABLE_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(hiveTable,
TRINO_TABLE_HIVE_TABLE_RELATIONSHIP));
+
+ for (AtlasEntity columnEntity : columnEntities) {
+ connectTrinoColumn(instanceName, schemaName, tableName,
columnEntity);
+ }
+ }
+ } catch (AtlasServiceException e) {
+ LOG.error("Error encountered: ", e);
+ }
+ }
+
+ public void connectTrinoColumn(String instanceName, String schemaName,
String tableName, AtlasEntity trinoColumn) throws AtlasServiceException {
+ if (instanceName == null) {
+ return;
+ }
+
+ AtlasEntity hiveColumn;
+ try {
+ hiveColumn = toColumnEntity(instanceName, schemaName, tableName,
trinoColumn.getAttribute("name").toString());
+ } catch (AtlasServiceException e) {
+ throw new AtlasServiceException(e);
+ }
+ if (hiveColumn != null) {
+
trinoColumn.setRelationshipAttribute(TRINO_COLUMN_HIVE_COLUMN_ATTRIBUTE,
AtlasTypeUtil.getAtlasRelatedObjectId(hiveColumn,
TRINO_COLUMN_HIVE_COLUMN_RELATIONSHIP));
+ }
+ }
+
+ private AtlasEntity toDbEntity(String instanceName, String schemaName)
throws AtlasServiceException {
+ String dbQualifiedName = schemaName + "@"
+ instanceName;
+ AtlasEntity.AtlasEntityWithExtInfo ret =
AtlasClientHelper.findEntity(HIVE_DB, dbQualifiedName, true, true);
+
+ return ret != null ? ret.getEntity() : null;
+ }
+
+ private AtlasEntity toTableEntity(String instanceName, String schemaName,
String tableName) throws AtlasServiceException {
+ String tableQualifiedName = schemaName +
"." + tableName + "@" + instanceName;
+ AtlasEntity.AtlasEntityWithExtInfo ret =
AtlasClientHelper.findEntity(HIVE_TABLE, tableQualifiedName, true, true);
+
+ return ret != null ? ret.getEntity() : null;
+ }
+
+ private AtlasEntity toColumnEntity(String instanceName, String schemaName,
String tableName, String columnName) throws AtlasServiceException {
+ String columnQualifiedName = schemaName +
"." + tableName + "." + columnName + "@" + instanceName;
+ AtlasEntity.AtlasEntityWithExtInfo ret =
AtlasClientHelper.findEntity(HIVE_COLUMN, columnQualifiedName, true, true);
+
+ return ret != null ? ret.getEntity() : null;
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/RdbmsEntityConnector.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/RdbmsEntityConnector.java
new file mode 100644
index 000000000..1f1bb4d77
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/RdbmsEntityConnector.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class RdbmsEntityConnector extends AtlasEntityConnector {
+ private static final Logger LOG =
LoggerFactory.getLogger(RdbmsEntityConnector.class);
+
+ @Override
+ public void connectTrinoCatalog(String instanceName, String catalogName,
AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+
+ }
+
+ @Override
+ public void connectTrinoSchema(String instanceName, String catalogName,
String schemaName, AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo
entityWithExtInfo) {
+
+ }
+
+ @Override
+ public void connectTrinoTable(String instanceName, String catalogName,
String schemaName, String tableName, AtlasEntity entity, List<AtlasEntity>
columnEntities, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+
+ }
+}
diff --git
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java
new file mode 100644
index 000000000..5130ccb82
--- /dev/null
+++
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.model;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.trino.connector.AtlasEntityConnector;
+import org.apache.atlas.trino.connector.ConnectorFactory;
+
+public class Catalog {
+ private String instanceName;
+ private String name;
+ private String type;
+ private String schemaToImport;
+ private String tableToImport;
+ private String hookInstanceName;
+ private boolean hookEnabled;
+ private AtlasEntityConnector connector;
+ private AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity;
+
+ public Catalog(String name, String type, boolean hookEnabled, String
hookInstanceName, String instanceName) {
+ this.name = name;
+ this.type = type;
+ this.hookEnabled = hookEnabled;
+ this.hookInstanceName = hookInstanceName;
+ this.instanceName = instanceName;
+ setConnector();
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setConnector() {
+ if (hookEnabled) {
+ connector = ConnectorFactory.getConnector(type);
+ }
+ }
+
+ public AtlasEntityConnector getConnector() {
+ return connector;
+ }
+
+ public String getHookInstanceName() {
+ return hookInstanceName;
+ }
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public AtlasEntity.AtlasEntityWithExtInfo getTrinoInstanceEntity() {
+ return trinoInstanceEntity;
+ }
+
+ public void setTrinoInstanceEntity(AtlasEntity.AtlasEntityWithExtInfo
trinoInstanceEntity) {
+ this.trinoInstanceEntity = trinoInstanceEntity;
+ }
+
+ public String getTableToImport() {
+ return tableToImport;
+ }
+
+ public void setTableToImport(String tableToImport) {
+ this.tableToImport = tableToImport;
+ }
+
+ public String getSchemaToImport() {
+ return schemaToImport;
+ }
+
+ public void setSchemaToImport(String schemaToImport) {
+ this.schemaToImport = schemaToImport;
+ }
+}
diff --git
a/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java
b/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java
new file mode 100644
index 000000000..4a6ef475f
--- /dev/null
+++
b/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+public class TrinoExtractorIT {
+
+ /* List of testcases
+ Invalid Arguments
+ Invalid cron expression
+ Test valid Catalog to be run
+ Test Instance creation
+ Test catalog creation
+ Test schema creation
+ Test table creation
+ Test of hook is enabled, hook entity if created, is connected to Trino
entity
+ Test cron doesn't trigger new job, before earlier thread completes
+ Test without cron expression
+ Test even if catalog is not registered, it should run if passed from
commandLine
+ Deleted table
+ Deleted catalog
+ Deleted column
+ Deleted schema
+ Rename catalog
+ Rename schema
+ Tag propogated*/
+
+}
diff --git a/distro/pom.xml b/distro/pom.xml
index af0d01060..d7e958889 100644
--- a/distro/pom.xml
+++ b/distro/pom.xml
@@ -276,6 +276,7 @@ atlas.graph.storage.hbase.regions-per-server=1
<!--<descriptor>src/main/assemblies/migration-exporter.xml</descriptor>-->
<descriptor>src/main/assemblies/classification-updater.xml</descriptor>
<descriptor>src/main/assemblies/notification-analyzer.xml</descriptor>
+
<descriptor>src/main/assemblies/atlas-trino-extractor.xml</descriptor>
</descriptors>
<finalName>apache-atlas-${project.version}</finalName>
<tarLongFileMode>gnu</tarLongFileMode>
diff --git a/distro/src/main/assemblies/atlas-trino-extractor.xml
b/distro/src/main/assemblies/atlas-trino-extractor.xml
new file mode 100644
index 000000000..923c8dc95
--- /dev/null
+++ b/distro/src/main/assemblies/atlas-trino-extractor.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <formats>
+ <format>tar.gz</format>
+ </formats>
+ <id>trino-extractor</id>
+
<baseDirectory>apache-atlas-trino-extractor-${project.version}</baseDirectory>
+ <fileSets>
+ <fileSet>
+ <includes>
+ <include>README*</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>../addons/trino-extractor/src/main/conf</directory>
+ <outputDirectory>/conf</outputDirectory>
+ <includes>
+ <include>atlas-log4j.xml</include>
+ <include>atlas-application.properties</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+ <directory>../addons/trino-extractor/src/main/bin</directory>
+ <outputDirectory>/bin</outputDirectory>
+ <includes>
+ <include>run-trino-extractor.sh</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ <directoryMode>0755</directoryMode>
+ </fileSet>
+ <fileSet>
+
<directory>../addons/trino-extractor/target/dependency/trino</directory>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>../addons/trino-extractor/target</directory>
+ <outputDirectory>/lib</outputDirectory>
+ <includes>
+ <include>atlas-trino-extractor-*.jar</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+
+</assembly>
diff --git a/pom.xml b/pom.xml
index 5794929e6..073567ee8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
<module>addons/sqoop-bridge-shim</module>
<module>addons/storm-bridge</module>
<module>addons/storm-bridge-shim</module>
+ <module>addons/trino-extractor</module>
<module>atlas-examples</module>
<module>authorization</module>
<module>build-tools</module>