This is an automated email from the ASF dual-hosted git repository.

pinal pushed a commit to branch ATLAS-5021_u
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 9627a0522fc8be1a1a2e3ea16bdf32f7ca1d0cc0
Author: Pinal Shah <[email protected]>
AuthorDate: Mon Aug 18 11:34:23 2025 +0530

    ATLAS-5021: Extract Metadata from Trino periodically
---
 addons/models/6000-Trino/6000-trino_model.json     | 185 +++++++++
 addons/trino-extractor/pom.xml                     |  96 +++++
 .../src/main/bin/run-trino-extractor.sh            | 138 +++++++
 .../src/main/conf/atlas-application.properties     |  32 ++
 .../trino-extractor/src/main/conf/atlas-log4j.xml  |  42 ++
 .../apache/atlas/trino/cli/ExtractorContext.java   | 106 +++++
 .../apache/atlas/trino/cli/ExtractorService.java   | 352 +++++++++++++++++
 .../org/apache/atlas/trino/cli/TrinoExtractor.java | 229 +++++++++++
 .../atlas/trino/client/AtlasClientHelper.java      | 438 +++++++++++++++++++++
 .../atlas/trino/client/TrinoClientHelper.java      | 151 +++++++
 .../trino/connector/AtlasEntityConnector.java      |  31 ++
 .../atlas/trino/connector/ConnectorFactory.java    |  38 ++
 .../atlas/trino/connector/HiveEntityConnector.java | 119 ++++++
 .../trino/connector/IcebergEntityConnector.java    | 119 ++++++
 .../java/org/apache/atlas/trino/model/Catalog.java |  85 ++++
 .../src/main/resources/atlas-logback.xml           |  51 +++
 .../apache/atlas/trino/cli/TrinoExtractorIT.java   |  42 ++
 distro/pom.xml                                     |   1 +
 .../src/main/assemblies/atlas-trino-extractor.xml  |  65 +++
 pom.xml                                            |   1 +
 20 files changed, 2321 insertions(+)

diff --git a/addons/models/6000-Trino/6000-trino_model.json 
b/addons/models/6000-Trino/6000-trino_model.json
new file mode 100644
index 000000000..734038530
--- /dev/null
+++ b/addons/models/6000-Trino/6000-trino_model.json
@@ -0,0 +1,185 @@
+{
+  "entityDefs": [
+    {
+      "name":         "trino_instance",
+      "superTypes":   [ "DataSet" ],
+      "serviceType":  "trino",
+      "typeVersion":  "1.0",
+      "attributeDefs": [
+        { "name": "hostname", "typeName": "string", "isOptional": true, 
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
+        { "name": "port",     "typeName": "int",    "isOptional": true, 
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false }
+      ]
+    },
+    {
+      "name":           "trino_catalog",
+      "superTypes":     [ "Asset" ],
+      "serviceType":    "trino",
+      "typeVersion":    "1.0",
+      "attributeDefs":  [
+        { "name": "connectorType", "typeName": "string", "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false },
+        { "name": "connectionUrl", "typeName": "string", "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true }
+      ]
+    },
+    {
+      "name":         "trino_schema",
+      "superTypes":   [ "Asset" ],
+      "serviceType":  "trino",
+      "typeVersion":  "1.0",
+      "attributeDefs": [
+        { "name": "parameters", "typeName": "map<string,string>", 
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": 
true }
+      ]
+    },
+    {
+      "name":         "trino_table",
+      "superTypes":   [ "DataSet" ],
+      "serviceType":  "trino",
+      "typeVersion":  "1.0",
+      "attributeDefs": [
+        { "name": "comment",    "typeName": "string",             
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": 
true },
+        { "name": "parameters", "typeName": "map<string,string>", 
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": 
true },
+        { "name": "table_type", "typeName": "string",             
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": 
true }
+      ]
+    },
+    {
+      "name":         "trino_column",
+      "superTypes":   [ "DataSet" ],
+      "serviceType":  "trino",
+      "typeVersion":  "1.0",
+      "attributeDefs": [
+        { "name": "data_type",        "typeName": "string",  "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": true,  "isOptional": true },
+        { "name": "ordinal_position", "typeName": "int",     "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true },
+        { "name": "column_default",   "typeName": "string",  "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true },
+        { "name": "comment",          "typeName": "string",  "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true },
+        { "name": "is_nullable",      "typeName": "boolean", "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true },
+        { "name": "isPrimaryKey",     "typeName": "boolean", "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true }
+      ]
+    },
+    {
+      "name":         "trino_column_lineage",
+      "superTypes":   [ "Process" ],
+      "serviceType":  "trino",
+      "typeVersion":  "1.0",
+      "attributeDefs": [
+        { "name": "expression", "typeName": "string", "cardinality": "SINGLE", 
"isUnique": false, "isIndexable": false, "isOptional": true }
+      ]
+    },
+    {
+      "name":         "trino_process",
+      "superTypes":   [ "Process" ],
+      "serviceType":  "trino",
+      "typeVersion":  "1.0",
+      "attributeDefs": [
+        { "name": "queryId",         "typeName": "string", "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 },
+        { "name": "queryCreateTime", "typeName": "long",   "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 },
+        { "name": "queryEndTime",    "typeName": "long",   "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 },
+        { "name": "queryText",       "typeName": "string", "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 }
+      ]
+    },
+    {
+      "name":           "trino_schema_ddl",
+      "superTypes":     [ "ddl" ],
+      "serviceType":    "trino",
+      "typeVersion":    "1.0",
+      "attributeDefs":  [ ]
+    },
+    {
+      "name":           "trino_table_ddl",
+      "superTypes":     [ "ddl" ],
+      "serviceType":    "trino",
+      "typeVersion":    "1.0",
+      "attributeDefs":  [ ]
+    }
+  ],
+  "relationshipDefs": [
+    {
+      "name":                 "trino_instance_catalog",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "COMPOSITION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_catalog",  "name": "instance", 
"isContainer": false, "cardinality": "SINGLE", "isLegacyAttribute": false },
+      "endDef2": { "type": "trino_instance", "name": "catalogs", 
"isContainer": true,  "cardinality": "SET" }
+    },
+    {
+      "name":                 "trino_schema_catalog",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "AGGREGATION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_schema",  "name": "catalog", "isContainer": 
false, "cardinality": "SINGLE", "isLegacyAttribute": false },
+      "endDef2": { "type": "trino_catalog", "name": "schemas", "isContainer": 
true,  "cardinality": "SET" }
+    },
+    {
+      "name":                 "trino_table_schema",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "AGGREGATION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_table",  "name": "trinoschema", 
"isContainer": false, "cardinality": "SINGLE", "isLegacyAttribute": false },
+      "endDef2": { "type": "trino_schema", "name": "tables",      
"isContainer": true,  "cardinality": "SET" }
+    },
+    {
+      "name":                 "trino_table_columns",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "COMPOSITION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_table",  "name": "columns", "isContainer": 
true,  "cardinality": "SET",    "isLegacyAttribute": false },
+      "endDef2": { "type": "trino_column", "name": "table",   "isContainer": 
false, "cardinality": "SINGLE", "isLegacyAttribute": false }
+    },
+    {
+      "name":                 "trino_table_ddl_queries",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "COMPOSITION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_table",     "name": "ddlQueries", 
"isContainer": true,  "cardinality": "SET"    },
+      "endDef2": { "type": "trino_table_ddl", "name": "table",      
"isContainer": false, "cardinality": "SINGLE" }
+    },
+    {
+      "name":                 "trino_schema_ddl_queries",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "COMPOSITION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_schema",     "name": "ddlQueries", 
"isContainer": true,  "cardinality": "SET"    },
+      "endDef2": { "type": "trino_schema_ddl", "name": "db",         
"isContainer": false, "cardinality": "SINGLE" }
+    },
+    {
+      "name":                 "trino_process_column_lineage",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "COMPOSITION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_column_lineage", "name": "query",          
"isContainer": false, "cardinality": "SINGLE" },
+      "endDef2": { "type": "trino_process",        "name": "columnLineages", 
"isContainer": true,  "cardinality": "SET"    }
+    },
+    {
+      "name":                 "trino_schema_hive_db",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "ASSOCIATION",
+      "propagateTags":        "BOTH",
+      "endDef1": { "type": "hive_db",      "name": "trino_schema",  
"cardinality": "SET"    },
+      "endDef2": { "type": "trino_schema", "name": "hive_db",       
"cardinality": "SINGLE" }
+    },
+    {
+      "name":                 "trino_table_hive_table",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "ASSOCIATION",
+      "propagateTags":        "BOTH",
+      "endDef1": { "type": "hive_table",  "name": "trino_table", 
"cardinality": "SET"    },
+      "endDef2": { "type": "trino_table", "name": "hive_table",  
"cardinality": "SINGLE" }
+    },
+    {
+      "name":                 "trino_column_hive_column",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "ASSOCIATION",
+      "propagateTags":        "BOTH",
+      "endDef1": { "type": "hive_column",  "name": "trino_column", 
"cardinality": "SET"    },
+      "endDef2": { "type": "trino_column", "name": "hive_column",  
"cardinality": "SINGLE" }
+    }
+  ]
+}
diff --git a/addons/trino-extractor/pom.xml b/addons/trino-extractor/pom.xml
new file mode 100644
index 000000000..70dcbcd5f
--- /dev/null
+++ b/addons/trino-extractor/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.atlas</groupId>
+        <artifactId>apache-atlas</artifactId>
+        <version>3.0.0-SNAPSHOT</version>
+        <relativePath>../../</relativePath>
+    </parent>
+
+    <artifactId>atlas-trino-extractor</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Apache Atlas Trino Bridge</name>
+    <description>Apache Atlas Trino Bridge Module</description>
+
+    <properties>
+        <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
+        <checkstyle.skip>false</checkstyle.skip>
+    </properties>
+    
+    <dependencies>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+            <version>1.9</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-jdbc</artifactId>
+            <version>403</version>
+            <!-- java8 supported version -->
+        </dependency>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-client-v2</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-intg</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.quartz-scheduler</groupId>
+            <artifactId>quartz</artifactId>
+            <version>2.3.2</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <excludeScope>test</excludeScope>
+                            <includeScope>compile</includeScope>
+                            
<outputDirectory>${project.build.directory}/dependency/trino</outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/addons/trino-extractor/src/main/bin/run-trino-extractor.sh 
b/addons/trino-extractor/src/main/bin/run-trino-extractor.sh
new file mode 100755
index 000000000..a7d4c8d71
--- /dev/null
+++ b/addons/trino-extractor/src/main/bin/run-trino-extractor.sh
@@ -0,0 +1,138 @@
+#!/bin/bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License. See accompanying LICENSE file.
+#
+# resolve links - $0 may be a softlink
+PRG="${0}"
+
+[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true
+
+while [ -h "${PRG}" ]; do
+  ls=`ls -ld "${PRG}"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    PRG=`dirname "${PRG}"`/"$link"
+  fi
+done
+
+BASEDIR=`dirname ${PRG}`
+BASEDIR=`cd ${BASEDIR}/..;pwd`
+
+if test -z "${JAVA_HOME}"
+then
+    JAVA_BIN=`which java`
+    JAR_BIN=`which jar`
+else
+    JAVA_BIN="${JAVA_HOME}/bin/java"
+    JAR_BIN="${JAVA_HOME}/bin/jar"
+fi
+export JAVA_BIN
+
+if [ ! -e "${JAVA_BIN}" ] || [ ! -e "${JAR_BIN}" ]; then
+  echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure 
java and jar commands are available."
+  exit 1
+fi
+
+# Construct Atlas classpath using jars from hook/hive/atlas-hive-plugin-impl/ 
directory.
+for i in "${BASEDIR}/lib/"*.jar; do
+  ATLASCPPATH="${ATLASCPPATH}:$i"
+done
+
+if [ -z "${ATLAS_CONF_DIR}" ] && [ -e "${BASEDIR}/conf/" ];then
+    ATLAS_CONF_DIR="${BASEDIR}/conf/"
+fi
+ATLASCPPATH=${ATLASCPPATH}:${ATLAS_CONF_DIR}
+
+# log dir for applications
+ATLAS_LOG_DIR="${BASEDIR}/log"
+export ATLAS_LOG_DIR
+LOGFILE="$ATLAS_LOG_DIR/atlas-trino-extractor.log"
+
+TIME=`date +%Y%m%d%H%M%s`
+
+CP="${ATLASCPPATH}"
+
+# If running in cygwin, convert pathnames and classpath to Windows format.
+if [ "${CYGWIN}" == "true" ]
+then
+   ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}`
+   LOGFILE=`cygpath -w ${LOGFILE}`
+   HIVE_CP=`cygpath -w ${HIVE_CP}`
+   HADOOP_CP=`cygpath -w ${HADOOP_CP}`
+   CP=`cygpath -w -p ${CP}`
+fi
+
+JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR 
-Datlas.log.file=atlas-trino-extractor.log
+-Dlog4j.configuration=atlas-log4j.xml -Djdk.httpclient.HttpClient.log=requests 
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"
+
+IMPORT_ARGS=()
+JVM_ARGS=
+
+set -f
+while true
+do
+  option=${1}
+  shift
+
+  case "${option}" in
+    -c) IMPORT_ARGS+=("-c" "$1"); shift;;
+    -s) IMPORT_ARGS+=("-s" "$1"); shift;;
+    -t) IMPORT_ARGS+=("-t" "$1"); shift;;
+    -cx)
+        CRON_EXPR="$1"
+        shift
+        while [[ "$1" != "" && "$1" != -* ]]; do
+          CRON_EXPR="$CRON_EXPR $1"
+          shift
+        done
+             IMPORT_ARGS+=("-cx" "$CRON_EXPR");;
+    -h) export HELP_OPTION="true"; IMPORT_ARGS+=("-h");;
+    --catalog) IMPORT_ARGS+=("--catalog" "$1"); shift;;
+    --table) IMPORT_ARGS+=("--table" "$1"); shift;;
+    --schema) IMPORT_ARGS+=("--schema" "$1"); shift;;
+    --cronExpression)
+          CRON_EXPR="$1"
+          shift
+          while [[ "$1" != "" && "$1" != -* ]]; do
+            CRON_EXPR="$CRON_EXPR $1"
+            shift
+          done
+             IMPORT_ARGS+=("--cronExpression" "$CRON_EXPR");;
+    --help) export HELP_OPTION="true"; IMPORT_ARGS+=("--help");;
+    -*)
+           echo "Invalid argument found"
+           export HELP_OPTION="true"; IMPORT_ARGS+=("--help")
+           break;;
+    "") break;;
+  esac
+done
+
+JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}"
+
+if [ -z ${HELP_OPTION} ]; then
+  echo "Log file for import is $LOGFILE"
+fi
+
+"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" 
org.apache.atlas.trino.cli.TrinoExtractor "${IMPORT_ARGS[@]}"
+
+set +f
+
+RETVAL=$?
+if [ -z ${HELP_OPTION} ]; then
+  [ $RETVAL -eq 0 ] && echo Trino Meta Data imported successfully!
+  [ $RETVAL -eq 1 ] && echo Failed to import Trino Meta Data! Check logs at: 
$LOGFILE for details.
+fi
+
+exit $RETVAL
diff --git a/addons/trino-extractor/src/main/conf/atlas-application.properties 
b/addons/trino-extractor/src/main/conf/atlas-application.properties
new file mode 100755
index 000000000..d709ad016
--- /dev/null
+++ b/addons/trino-extractor/src/main/conf/atlas-application.properties
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######## Atlas connection ############
+atlas.rest.address=http://localhost:21000/
+
+######## Trino connection ############
+atlas.trino.jdbc.address=jdbc:trino://<host>:<port>/
+atlas.trino.jdbc.user=<username>
+
+######## Trino environment name ######
+atlas.trino.namespace=cm
+#atlas.trino.catalogs.registered=
+
+######## Datasource for which ########
+######## Atlas hook is enabled #######
+#atlas.trino.catalog.hook.enabled.hive_catalog=true
+#atlas.trino.catalog.hook.enabled.hive_catalog.namespace=cm
\ No newline at end of file
diff --git a/addons/trino-extractor/src/main/conf/atlas-log4j.xml 
b/addons/trino-extractor/src/main/conf/atlas-log4j.xml
new file mode 100644
index 000000000..371c3b3e3
--- /dev/null
+++ b/addons/trino-extractor/src/main/conf/atlas-log4j.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
+    <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+        <param name="File" value="${atlas.log.dir}/${atlas.log.file}"/>
+        <param name="Append" value="true"/>
+        <param name="maxFileSize" value="100MB" />
+        <param name="maxBackupIndex" value="20" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m 
(%C{1}:%L)%n"/>
+        </layout>
+    </appender>
+
+    <logger name="org.apache.atlas.trino" additivity="false">
+        <level value="info"/>
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <root>
+        <priority value="warn"/>
+        <appender-ref ref="FILE"/>
+    </root>
+</log4j:configuration>
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java
new file mode 100644
index 000000000..4cc14969a
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.IOException;
+
+public class ExtractorContext {
+    static final String TRINO_NAMESPACE_CONF         = "atlas.trino.namespace";
+    static final String DEFAULT_TRINO_NAMESPACE      = "cm";
+    static final String OPTION_CATALOG_SHORT         = "c";
+    static final String OPTION_CATALOG_LONG          = "catalog";
+    static final String OPTION_SCHEMA_SHORT          = "s";
+    static final String OPTION_SCHEMA_LONG           = "schema";
+    static final String OPTION_TABLE_SHORT           = "t";
+    static final String OPTION_TABLE_LONG            = "table";
+    static final String OPTION_CRON_EXPRESSION_SHORT = "cx";
+    static final String OPTION_CRON_EXPRESSION_LONG  = "cronExpression";
+    static final String OPTION_HELP_SHORT            = "h";
+    static final String OPTION_HELP_LONG             = "help";
+
+    private final Configuration     atlasConf;
+    private final String            namespace;
+    private final String            catalog;
+    private final String            schema;
+    private final String            table;
+    private final AtlasClientHelper atlasClientHelper;
+    private final TrinoClientHelper trinoClientHelper;
+    private final String            cronExpression;
+
+    public ExtractorContext(CommandLine cmd) throws AtlasException, 
IOException {
+        this.atlasConf         = getAtlasProperties();
+        this.atlasClientHelper = createAtlasClientHelper();
+        this.trinoClientHelper = createTrinoClientHelper();
+        this.namespace         = atlasConf.getString(TRINO_NAMESPACE_CONF, 
DEFAULT_TRINO_NAMESPACE);
+        this.catalog           = cmd.getOptionValue(OPTION_CATALOG_SHORT);
+        this.schema            = cmd.getOptionValue(OPTION_SCHEMA_SHORT);
+        this.table             = cmd.getOptionValue(OPTION_TABLE_SHORT);
+        this.cronExpression    = 
cmd.getOptionValue(OPTION_CRON_EXPRESSION_SHORT);
+    }
+
+    public Configuration getAtlasConf() {
+        return atlasConf;
+    }
+
+    public AtlasClientHelper getAtlasConnector() {
+        return atlasClientHelper;
+    }
+
+    public TrinoClientHelper getTrinoConnector() {
+        return trinoClientHelper;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public String getSchema() {
+        return schema;
+    }
+
+    public String getCatalog() {
+        return catalog;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public String getCronExpression() {
+        return cronExpression;
+    }
+
+    private Configuration getAtlasProperties() throws AtlasException {
+        return ApplicationProperties.get();
+    }
+
+    private TrinoClientHelper createTrinoClientHelper() {
+        return new TrinoClientHelper(atlasConf);
+    }
+
+    private AtlasClientHelper createAtlasClientHelper() throws IOException {
+        return new AtlasClientHelper(atlasConf);
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java
new file mode 100644
index 000000000..554a32a21
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ExtractorService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExtractorService.class);
+
+    public static final  int    THREAD_POOL_SIZE         = 5;
+    public static final  int    CATALOG_EXECUTION_TIMEOUT = 60;
+    public static final  String TRINO_NAME_ATTRIBUTE = "name";
+
+    private static final String TRINO_CATALOG_REGISTERED          = 
"atlas.trino.catalogs.registered";
+    private static final String TRINO_CATALOG_HOOK_ENABLED_PREFIX = 
"atlas.trino.catalog.hook.enabled.";
+    private static final String TRINO_CATALOG_HOOK_ENABLED_SUFFIX = 
".namespace";
+
+    public boolean execute(ExtractorContext context) throws Exception {
+        Map<String, String> catalogs = 
context.getTrinoConnector().getAllTrinoCatalogs();
+
+        LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), 
catalogs.keySet());
+
+        try {
+            processCatalogs(context, catalogs);
+
+            deleteCatalogs(context, catalogs);
+        } catch (AtlasServiceException e) {
+            throw new AtlasServiceException(e);
+        }
+
+        return true;
+    }
+
+    private void processCatalogs(ExtractorContext context, Map<String, String> 
catalogInTrino) throws AtlasServiceException {
+        if (MapUtils.isEmpty(catalogInTrino)) {
+            LOG.debug("No catalogs found under Trino");
+        } else {
+            List<Catalog> catalogsToProcess = new ArrayList<>();
+
+            if (StringUtils.isEmpty(context.getCatalog())) {
+                String[] registeredCatalogs = 
context.getAtlasConf().getStringArray(TRINO_CATALOG_REGISTERED);
+
+                if (registeredCatalogs != null) {
+                    for (String registeredCatalog : registeredCatalogs) {
+                        if (catalogInTrino.containsKey(registeredCatalog)) {
+                            catalogsToProcess.add(getCatalogInstance(context, 
registeredCatalog, catalogInTrino.get(registeredCatalog)));
+                        }
+                    }
+                }
+            } else {
+                if (catalogInTrino.containsKey(context.getCatalog())) {
+                    Catalog catalog = getCatalogInstance(context, 
context.getCatalog(), catalogInTrino.get(context.getCatalog()));
+
+                    catalog.setSchemaToImport(context.getSchema());
+                    catalog.setTableToImport(context.getTable());
+
+                    catalogsToProcess.add(catalog);
+                }
+            }
+
+            if (CollectionUtils.isEmpty(catalogsToProcess)) {
+                LOG.info("No catalogs found to extract");
+            } else {
+                LOG.info("{} catalogs to be extracted", 
catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList()));
+
+                AtlasEntityWithExtInfo trinoInstanceEntity = 
context.getAtlasConnector().createOrUpdateInstanceEntity(context.getNamespace());
+                ExecutorService        catalogExecutor     = 
Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), 
THREAD_POOL_SIZE));
+
+                try {
+                    for (Catalog currentCatalog : catalogsToProcess) {
+                        catalogExecutor.submit(() -> {
+                            try {
+                                
currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity);
+                                processCatalog(context, currentCatalog);
+                            } catch (Exception e) {
+                                LOG.error("Error processing catalog: {}", 
currentCatalog, e);
+                            }
+                        });
+                    }
+                } finally {
+                    catalogExecutor.shutdown();
+                }
+
+                try {
+                    if 
(!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, 
TimeUnit.MINUTES)) {
+                        LOG.warn("Catalog processing did not complete within 
{} minutes", CATALOG_EXECUTION_TIMEOUT);
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+
+                    LOG.error("Catalog processing was interrupted", e);
+                }
+
+                LOG.info("Catalogs scan completed");
+            }
+        }
+    }
+
+    private void processCatalog(ExtractorContext context, Catalog catalog) 
throws AtlasServiceException, SQLException {
+        if (catalog != null) {
+            LOG.info("Started extracting catalog: {}", catalog.getName());
+
+            String                 catalogName        = catalog.getName();
+            AtlasEntityWithExtInfo trinoCatalogEntity = 
context.getAtlasConnector().createOrUpdateCatalogEntity(catalog);
+            List<String>           schemas            = 
context.getTrinoConnector().getTrinoSchemas(catalogName, 
catalog.getSchemaToImport());
+
+            LOG.info("Found {} schemas in catalog {}", schemas.size(), 
catalogName);
+
+            processSchemas(context, catalog, trinoCatalogEntity.getEntity(), 
schemas);
+
+            if (StringUtils.isEmpty(context.getSchema())) {
+                deleteSchemas(context, schemas, 
trinoCatalogEntity.getEntity().getGuid());
+            }
+        }
+    }
+
+    private void processSchemas(ExtractorContext context, Catalog catalog, 
AtlasEntity trinoCatalogEntity, List<String> schemaToImport) {
+        for (String schemaName : schemaToImport) {
+            LOG.info("Started extracting schema: {}", schemaName);
+
+            try {
+                AtlasEntityWithExtInfo           schemaEntity = 
context.getAtlasConnector().createOrUpdateSchemaEntity(catalog, 
trinoCatalogEntity, schemaName);
+                Map<String, Map<String, Object>> tables       = 
context.getTrinoConnector().getTrinoTables(catalog.getName(), schemaName, 
catalog.getTableToImport());
+
+                LOG.info("Found {} tables in schema {}.{}", tables.size(), 
catalog.getName(), schemaName);
+
+                processTables(context, catalog, schemaName, 
schemaEntity.getEntity(), tables);
+
+                if (StringUtils.isEmpty(context.getTable())) {
+                    deleteTables(context, new ArrayList<>(tables.keySet()), 
schemaEntity.getEntity().getGuid());
+                }
+            } catch (Exception e) {
+                LOG.error("Error processing schema: {}", schemaName, e);
+            }
+        }
+    }
+
+    private void processTables(ExtractorContext context, Catalog catalog, 
String schemaName, AtlasEntity schemaEntity,  Map<String, Map<String, Object>> 
trinoTables) {
+        for (String trinoTableName : trinoTables.keySet()) {
+            LOG.info("Started extracting table: {}", trinoTableName);
+
+            try {
+                Map<String, Map<String, Object>> trinoColumns = 
context.getTrinoConnector().getTrinoColumns(catalog.getName(), schemaName, 
trinoTableName);
+
+                LOG.info("Found {} columns in table {}.{}.{}", 
trinoColumns.size(), catalog.getName(), schemaName, trinoTableName);
+
+                context.getAtlasConnector().createOrUpdateTableEntity(catalog, 
schemaName, trinoTableName, trinoTables.get(trinoTableName), trinoColumns, 
schemaEntity);
+            } catch (Exception e) {
+                LOG.error("Error processing table: {}", trinoTableName, e);
+            }
+        }
+    }
+
+    private void deleteCatalogs(ExtractorContext context, Map<String, String> 
catalogInTrino) throws AtlasServiceException {
+        if (StringUtils.isEmpty(context.getCatalog())) {
+            AtlasEntityHeader trinoInstance = 
context.getAtlasConnector().getTrinoInstance(context.getNamespace());
+
+            if (trinoInstance != null) {
+                Set<String> catalogsToDelete = getCatalogsToDelete(context, 
catalogInTrino, trinoInstance.getGuid());
+
+                if (CollectionUtils.isNotEmpty(catalogsToDelete)) {
+                    LOG.info("Atlas has {} catalogs in instance {} that are no 
more present in Trino, starting to delete", catalogsToDelete, 
trinoInstance.getGuid());
+
+                    for (String catalogGuid : catalogsToDelete) {
+                        try {
+                            deleteSchemas(context, Collections.emptyList(), 
catalogGuid);
+
+                            LOG.info("Deleting catalog: {}", catalogGuid);
+
+                            
context.getAtlasConnector().deleteByGuid(Collections.singleton(catalogGuid));
+                        } catch (AtlasServiceException e) {
+                            LOG.error("Error deleting catalog: {}", 
catalogGuid, e);
+                        }
+                    }
+                } else {
+                    LOG.info("Atlas has no catalogs to delete");
+                }
+            }
+
+            LOG.info("Catalogs scanned for deletion completed");
+        }
+    }
+
+    private void deleteSchemas(ExtractorContext context, List<String> 
schemasInTrino, String catalogGuid) {
+        try {
+            Set<String> schemasToDelete = getSchemasToDelete(context, 
schemasInTrino, catalogGuid);
+
+            if (CollectionUtils.isNotEmpty(schemasToDelete)) {
+                LOG.info("Atlas has {} schemas in catalog {} that are no more 
present in Trino, starting to delete", schemasToDelete.size(), catalogGuid);
+
+                for (String schemaGuid : schemasToDelete) {
+                    try {
+                        deleteTables(context, Collections.emptyList(), 
schemaGuid);
+
+                        LOG.info("Deleting schema: {}", schemaGuid);
+
+                        
context.getAtlasConnector().deleteByGuid(Collections.singleton(schemaGuid));
+                    } catch (AtlasServiceException e) {
+                        LOG.error("Error in deleting schema: {}", schemaGuid, 
e);
+                    }
+                }
+            } else {
+                LOG.info("Atlas has no schemas to delete in catalog {}", 
catalogGuid);
+            }
+        } catch (AtlasServiceException e) {
+            LOG.error("Error deleting schemas in catalog {}", catalogGuid, e);
+        }
+    }
+
+    private void deleteTables(ExtractorContext context, List<String> 
tablesInTrino, String schemaGuid) {
+        try {
+            Set<String> tablesToDelete = getTablesToDelete(context, 
tablesInTrino, schemaGuid);
+
+            if (CollectionUtils.isNotEmpty(tablesToDelete)) {
+                LOG.info("Atlas has {} tables in schema {} that are no more 
present in Trino, starting to delete", tablesToDelete.size(), schemaGuid);
+
+                for (String tableGuid : tablesToDelete) {
+                    try {
+                        LOG.info("Deleting table: {}", tableGuid);
+
+                        
context.getAtlasConnector().deleteByGuid(Collections.singleton(tableGuid));
+                    } catch (AtlasServiceException e) {
+                        LOG.error("Error deleting table: {}", tableGuid, e);
+                    }
+                }
+            } else {
+                LOG.info("Atlas has no tables to delete in schema {}", 
schemaGuid);
+            }
+        } catch (AtlasServiceException e) {
+            LOG.error("Error deleting tables in schema: {}", schemaGuid, e);
+        }
+    }
+
+    private static Catalog getCatalogInstance(ExtractorContext context, String 
catalogName, String connectorType) {
+        Catalog ret = null;
+
+        if (catalogName != null) {
+            boolean isHookEnabled = 
context.getAtlasConf().getBoolean(TRINO_CATALOG_HOOK_ENABLED_PREFIX + 
catalogName, false);
+            String  hookNamespace = null;
+
+            if (isHookEnabled) {
+                String hookNamespaceProperty = 
TRINO_CATALOG_HOOK_ENABLED_PREFIX + catalogName + 
TRINO_CATALOG_HOOK_ENABLED_SUFFIX;
+
+                hookNamespace = 
context.getAtlasConf().getString(hookNamespaceProperty);
+
+                if (hookNamespace == null) {
+                    LOG.warn("Atlas Hook is enabled for {}, but '{}' found 
empty", catalogName, hookNamespaceProperty);
+                }
+            }
+
+            ret = new Catalog(catalogName, connectorType, isHookEnabled, 
hookNamespace, context.getNamespace());
+        }
+
+        return ret;
+    }
+
+    private static Set<String> getCatalogsToDelete(ExtractorContext context, 
Map<String, String> catalogInTrino, String instanceGuid) throws 
AtlasServiceException {
+        Set<String> ret = Collections.emptySet();
+
+        if (instanceGuid != null) {
+            List<AtlasEntityHeader> catalogsInAtlas = 
context.getAtlasConnector().getAllCatalogsInInstance(instanceGuid);
+
+            if (CollectionUtils.isNotEmpty(catalogsInAtlas)) {
+                Map<String, String> finalCatalogInTrino = catalogInTrino == 
null ? Collections.emptyMap() : catalogInTrino;
+
+                ret = catalogsInAtlas.stream()
+                        .filter(entity -> 
entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null)
+                        .filter(entity -> 
!finalCatalogInTrino.containsKey(entity.getAttribute(TRINO_NAME_ATTRIBUTE)))
+                        .map(AtlasEntityHeader::getGuid)
+                        .collect(Collectors.toSet());
+            }
+        }
+
+        return ret;
+    }
+
+    private static Set<String> getSchemasToDelete(ExtractorContext context, 
List<String> schemasInTrino, String catalogGuid) throws AtlasServiceException {
+        Set<String> ret = Collections.emptySet();
+
+        if (catalogGuid != null) {
+            List<AtlasEntityHeader> schemasInAtlas = 
context.getAtlasConnector().getAllSchemasInCatalog(catalogGuid);
+
+            if (CollectionUtils.isNotEmpty(schemasInAtlas)) {
+                List<String> finalSchemasInTrino = schemasInTrino == null ? 
Collections.emptyList() : schemasInTrino;
+
+                ret = schemasInAtlas.stream()
+                        .filter(entity -> 
entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null)
+                        .filter(entity -> 
!finalSchemasInTrino.contains(entity.getAttribute(TRINO_NAME_ATTRIBUTE)))
+                        .map(AtlasEntityHeader::getGuid)
+                        .collect(Collectors.toSet());
+            }
+        }
+
+        return ret;
+    }
+
+    private static Set<String> getTablesToDelete(ExtractorContext context, 
List<String> tablesInTrino, String schemaGuid) throws AtlasServiceException {
+        Set<String> ret = Collections.emptySet();
+
+        if (schemaGuid != null) {
+            List<AtlasEntityHeader> tablesInAtlas = 
context.getAtlasConnector().getAllTablesInSchema(schemaGuid);
+
+            if (CollectionUtils.isNotEmpty(tablesInAtlas)) {
+                List<String> finalTablesInTrino = tablesInTrino == null ? 
Collections.emptyList() : tablesInTrino;
+
+                ret = tablesInAtlas.stream()
+                        .filter(entity -> 
entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null)
+                        .filter(entity -> 
!finalTablesInTrino.contains(entity.getAttribute(TRINO_NAME_ATTRIBUTE)))
+                        .map(AtlasEntityHeader::getGuid)
+                        .collect(Collectors.toSet());
+            }
+        }
+
+        return ret;
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java
new file mode 100644
index 000000000..df1bd7420
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.quartz.CronExpression;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CATALOG_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CATALOG_SHORT;
+import static 
org.apache.atlas.trino.cli.ExtractorContext.OPTION_CRON_EXPRESSION_LONG;
+import static 
org.apache.atlas.trino.cli.ExtractorContext.OPTION_CRON_EXPRESSION_SHORT;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_HELP_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_HELP_SHORT;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_SCHEMA_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_SCHEMA_SHORT;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_TABLE_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_TABLE_SHORT;
+
+public class TrinoExtractor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TrinoExtractor.class);
+
+    private static final int EXIT_CODE_SUCCESS = 0;
+    private static final int EXIT_CODE_FAILED  = 1;
+    private static final int EXIT_CODE_HELP    = 2;
+
+    private static TrinoExtractor instance;
+
+    private final ExtractorContext extractorContext;
+    private       int              exitCode = EXIT_CODE_FAILED;
+
+    private TrinoExtractor(String[] args) throws Exception {
+        extractorContext = createExtractorContext(args);
+    }
+
+    public static void main(String[] args) {
+        try {
+            instance = new TrinoExtractor(args);
+
+            if (instance.extractorContext != null) {
+                instance.run();
+            } else {
+                LOG.error("Extractor context is null. Cannot proceed with 
extraction.");
+
+                instance.exitCode = EXIT_CODE_FAILED;
+            }
+        } catch (Exception e) {
+            LOG.error("Extraction failed", e);
+
+            instance.exitCode = EXIT_CODE_FAILED;
+        }
+
+        System.exit(instance != null ? instance.exitCode : EXIT_CODE_FAILED);
+    }
+
+    private void run() {
+        try {
+            String cronExpression = extractorContext.getCronExpression();
+
+            if (StringUtils.isNotEmpty(cronExpression)) {
+                if (!CronExpression.isValidExpression(cronExpression)) {
+                    LOG.error("Invalid cron expression: {}", cronExpression);
+
+                    exitCode = EXIT_CODE_FAILED;
+                } else {
+                    LOG.info("Scheduling extraction for cron expression: {}", 
cronExpression);
+
+                    JobDetail job     = 
JobBuilder.newJob(MetadataJob.class).withIdentity("metadataJob", 
"group1").build();
+                    Trigger   trigger = TriggerBuilder.newTrigger()
+                            
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow()
+                            .build();
+
+                    Scheduler scheduler = null;
+
+                    try {
+                        scheduler = new StdSchedulerFactory().getScheduler();
+
+                        scheduler.scheduleJob(job, trigger);
+                        scheduler.start();
+
+                        try {
+                            Thread.currentThread().join();
+                        } catch (InterruptedException ie) {
+                            LOG.info("Main thread interrupted, shutting down 
scheduler.");
+
+                            exitCode = EXIT_CODE_SUCCESS;
+                        }
+                    } finally {
+                        try {
+                            if (scheduler != null && !scheduler.isShutdown()) {
+                                scheduler.shutdown(true);
+                            }
+                        } catch (SchedulerException se) {
+                            LOG.warn("Error shutting down scheduler: {}", 
se.getMessage());
+                        }
+                    }
+                }
+            } else {
+                LOG.info("Running extraction once");
+
+                ExtractorService extractorService = new ExtractorService();
+
+                if (extractorService.execute(extractorContext)) {
+                    exitCode = EXIT_CODE_SUCCESS;
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Error encountered. exitCode: {}", exitCode, e);
+        } finally {
+            if (extractorContext.getAtlasConnector() != null) {
+                extractorContext.getAtlasConnector().close();
+            }
+        }
+
+        System.exit(exitCode);
+    }
+
+    private ExtractorContext createExtractorContext(String[] args) throws 
AtlasBaseException, IOException {
+        Options acceptedCliOptions = prepareCommandLineOptions();
+
+        try {
+            CommandLine cmd = new BasicParser().parse(acceptedCliOptions, 
args, true);
+            List<String> argsNotProcessed = cmd.getArgList();
+
+            if (argsNotProcessed != null && !argsNotProcessed.isEmpty()) {
+                throw new AtlasBaseException("Unrecognized arguments.");
+            }
+
+            ExtractorContext ret = null;
+
+            if (cmd.hasOption(ExtractorContext.OPTION_HELP_SHORT)) {
+                printUsage(acceptedCliOptions);
+                exitCode = EXIT_CODE_HELP;
+            } else {
+                ret = new ExtractorContext(cmd);
+                LOG.debug("Successfully initialized the extractor context.");
+            }
+
+            return ret;
+        } catch (ParseException | AtlasBaseException e) {
+            printUsage(acceptedCliOptions);
+
+            throw new AtlasBaseException("Invalid arguments. Reason: " + 
e.getMessage(), e);
+        } catch (AtlasException e) {
+            throw new AtlasBaseException("Error in getting Application 
Properties. Reason: " + e.getMessage(), e);
+        } catch (IOException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private static Options prepareCommandLineOptions() {
+        Options acceptedCliOptions = new Options();
+
+        return acceptedCliOptions.addOption(OPTION_CATALOG_SHORT, 
OPTION_CATALOG_LONG, true, "Catalog name")
+                .addOption(OPTION_SCHEMA_SHORT, OPTION_SCHEMA_LONG, true, 
"Schema name")
+                .addOption(OPTION_TABLE_SHORT, OPTION_TABLE_LONG, true, "Table 
name")
+                .addOption(OPTION_CRON_EXPRESSION_SHORT, 
OPTION_CRON_EXPRESSION_LONG, true, "Cron expression to run extraction")
+                .addOption(OPTION_HELP_SHORT, OPTION_HELP_LONG, false, "Print 
help message");
+    }
+
+    private static void printUsage(Options options) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(TrinoExtractor.class.getName(), options);
+    }
+
+    @DisallowConcurrentExecution
+    public static class MetadataJob implements Job {
+        private final Logger LOG = LoggerFactory.getLogger(MetadataJob.class);
+
+        public void execute(JobExecutionContext context) throws 
JobExecutionException {
+            ExtractorContext extractorContext = TrinoExtractor.instance != 
null ? TrinoExtractor.instance.extractorContext : null;
+
+            if (extractorContext != null) {
+                LOG.info("Executing metadata extraction at: {}", 
java.time.LocalTime.now());
+
+                ExtractorService extractorService = new ExtractorService();
+
+                try {
+                    if (extractorService.execute(extractorContext)) {
+                        TrinoExtractor.instance.exitCode = EXIT_CODE_SUCCESS;
+                    }
+                } catch (Exception e) {
+                    LOG.error("Error encountered: ", e);
+                    throw new JobExecutionException(e);
+                }
+                LOG.info("Completed executing metadata extraction at: {}", 
java.time.LocalTime.now());
+            }
+        }
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java
new file mode 100644
index 000000000..70e623847
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.client;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.atlas.type.AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME;
+
+public class AtlasClientHelper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasClientHelper.class);
+
+    public static final String TRINO_INSTANCE                   = 
"trino_instance";
+    public static final String TRINO_CATALOG                    = 
"trino_catalog";
+    public static final String TRINO_SCHEMA                     = 
"trino_schema";
+    public static final String TRINO_TABLE                      = 
"trino_table";
+    public static final String TRINO_COLUMN                     = 
"trino_column";
+    public static final String TRINO_INSTANCE_CATALOG_ATTRIBUTE = "catalogs";
+    public static final String TRINO_CATALOG_SCHEMA_ATTRIBUTE   = "schemas";
+    public static final String TRINO_SCHEMA_TABLE_ATTRIBUTE     = "tables";
+    public static final String QUALIFIED_NAME_ATTRIBUTE         = 
"qualifiedName";
+    public static final String NAME_ATTRIBUTE                   = "name";
+    public static final int    DEFAULT_PAGE_LIMIT               = 10000;
+
+    private static final String DEFAULT_ATLAS_URL                        = 
"http://localhost:21000/";;
+    private static final String APPLICATION_PROPERTY_ATLAS_ENDPOINT      = 
"atlas.rest.address";
+    private static final String TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE   = 
"connectorType";
+    private static final String TRINO_CATALOG_INSTANCE_ATTRIBUTE         = 
"instance";
+    private static final String TRINO_CATALOG_INSTANCE_RELATIONSHIP      = 
"trino_instance_catalog";
+    private static final String TRINO_SCHEMA_CATALOG_ATTRIBUTE           = 
"catalog";
+    private static final String TRINO_SCHEMA_CATALOG_RELATIONSHIP        = 
"trino_schema_catalog";
+    private static final String TRINO_COLUMN_DATA_TYPE_ATTRIBUTE         = 
"data_type";
+    private static final String TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE = 
"ordinal_position";
+    private static final String TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE    = 
"column_default";
+    private static final String TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE       = 
"is_nullable";
+    private static final String TRINO_COLUMN_TABLE_ATTRIBUTE             = 
"table";
+    private static final String TRINO_TABLE_TYPE                         = 
"table_type";
+    private static final String TRINO_TABLE_COLUMN_RELATIONSHIP          = 
"trino_table_columns";
+    private static final String TRINO_TABLE_SCHEMA_RELATIONSHIP          = 
"trino_table_schema";
+    private static final String TRINO_TABLE_SCHEMA_ATTRIBUTE             = 
"trinoschema";
+    private static final String TRINO_TABLE_COLUMN_ATTRIBUTE             = 
"columns";
+
+    private static AtlasClientV2 atlasClientV2;
+
+    public AtlasClientHelper(Configuration atlasConf) throws IOException {
+        atlasClientV2 = getAtlasClientV2Instance(atlasConf);
+    }
+
+    public List<AtlasEntityHeader> getAllCatalogsInInstance(String 
instanceGuid) throws AtlasServiceException {
+        List<AtlasEntityHeader> entities = 
getAllRelationshipEntities(instanceGuid, TRINO_INSTANCE_CATALOG_ATTRIBUTE);
+
+        if (CollectionUtils.isNotEmpty(entities)) {
+            LOG.debug("Retrieved {} catalogs in Trino instance {}", 
entities.size(), instanceGuid);
+        } else {
+            LOG.debug("No catalog found in Trino instance {}", instanceGuid);
+        }
+
+        return entities;
+    }
+
+    public List<AtlasEntityHeader> getAllSchemasInCatalog(String catalogGuid) 
throws AtlasServiceException {
+        List<AtlasEntityHeader> entities = 
getAllRelationshipEntities(catalogGuid, TRINO_CATALOG_SCHEMA_ATTRIBUTE);
+
+        if (CollectionUtils.isNotEmpty(entities)) {
+            LOG.debug("Retrieved {} schemas in Trino catalog {}", 
entities.size(), catalogGuid);
+        } else {
+            LOG.debug("No schema found in Trino catalog {}", catalogGuid);
+        }
+
+        return entities;
+    }
+
+    public List<AtlasEntityHeader> getAllTablesInSchema(String schemaGuid) 
throws AtlasServiceException {
+        List<AtlasEntityHeader> entities = 
getAllRelationshipEntities(schemaGuid, TRINO_SCHEMA_TABLE_ATTRIBUTE);
+
+        if (CollectionUtils.isNotEmpty(entities)) {
+            LOG.debug("Retrieved {} tables in Trino schema {}", 
entities.size(), schemaGuid);
+        } else {
+            LOG.debug("No table found in Trino schema {}", schemaGuid);
+        }
+
+        return entities;
+    }
+
+    public AtlasEntityHeader getTrinoInstance(String namespace)  {
+        try {
+            return atlasClientV2.getEntityHeaderByAttribute(TRINO_INSTANCE, 
Collections.singletonMap(QUALIFIED_NAME_ATTRIBUTE, namespace));
+        } catch (AtlasServiceException e) {
+            return null;
+        }
+    }
+
+    public AtlasEntityWithExtInfo createOrUpdateInstanceEntity(String 
trinoNamespace) throws AtlasServiceException {
+        String                 qualifiedName = trinoNamespace;
+        AtlasEntityWithExtInfo ret           = findEntity(TRINO_INSTANCE, 
qualifiedName, true, true);
+
+        if (ret == null) {
+            AtlasEntity entity = new AtlasEntity(TRINO_INSTANCE);
+
+            entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+            entity.setAttribute(NAME_ATTRIBUTE, trinoNamespace);
+
+            ret = createEntity(new AtlasEntityWithExtInfo(entity));
+        }
+
+        return ret;
+    }
+
+    public AtlasEntityWithExtInfo createOrUpdateCatalogEntity(Catalog catalog) 
throws AtlasServiceException {
+        String catalogName    = catalog.getName();
+        String trinoNamespace = catalog.getInstanceName();
+        String qualifiedName  = String.format("%s@%s", catalogName, 
trinoNamespace);
+
+        AtlasEntityWithExtInfo ret = findEntity(TRINO_CATALOG, qualifiedName, 
true, true);
+
+        if (ret == null) {
+            AtlasEntity entity = new AtlasEntity(TRINO_CATALOG);
+
+            entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+            entity.setAttribute(NAME_ATTRIBUTE, catalogName);
+            entity.setAttribute(TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE, 
catalog.getType());
+            entity.setRelationshipAttribute(TRINO_CATALOG_INSTANCE_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(catalog.getTrinoInstanceEntity().getEntity(),
 TRINO_CATALOG_INSTANCE_RELATIONSHIP));
+
+            if (catalog.getConnector() != null) {
+                catalog.getConnector().connectTrinoCatalog(this, 
catalog.getHookInstanceName(), catalogName, entity, ret);
+            }
+
+            ret = createEntity(new AtlasEntityWithExtInfo(entity));
+        } else {
+            AtlasEntity entity = ret.getEntity();
+
+            entity.setRelationshipAttribute(TRINO_CATALOG_INSTANCE_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(catalog.getTrinoInstanceEntity().getEntity(),
 TRINO_CATALOG_INSTANCE_RELATIONSHIP));
+
+            if (catalog.getConnector() != null) {
+                catalog.getConnector().connectTrinoCatalog(this, 
catalog.getHookInstanceName(), catalogName, entity, ret);
+            }
+
+            ret.setEntity(entity);
+
+            updateEntity(ret);
+        }
+
+        return ret;
+    }
+
+    public AtlasEntityWithExtInfo createOrUpdateSchemaEntity(Catalog catalog, 
AtlasEntity catalogEntity, String schema) throws AtlasServiceException {
+        String                 qualifiedName = String.format("%s.%s@%s", 
catalog.getName(), schema, catalog.getInstanceName());
+        AtlasEntityWithExtInfo ret           = findEntity(TRINO_SCHEMA, 
qualifiedName, true, true);
+
+        if (ret == null) {
+            AtlasEntity entity = new AtlasEntity(TRINO_SCHEMA);
+
+            entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+            entity.setAttribute(NAME_ATTRIBUTE, schema);
+            entity.setRelationshipAttribute(TRINO_SCHEMA_CATALOG_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(catalogEntity, 
TRINO_SCHEMA_CATALOG_RELATIONSHIP));
+
+            if (catalog.getConnector() != null) {
+                catalog.getConnector().connectTrinoSchema(this, 
catalog.getHookInstanceName(), catalog.getName(), schema, entity, ret);
+            }
+
+            ret = createEntity(new AtlasEntityWithExtInfo(entity));
+        } else {
+            AtlasEntity entity = ret.getEntity();
+
+            entity.setRelationshipAttribute(TRINO_SCHEMA_CATALOG_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(catalogEntity, 
TRINO_SCHEMA_CATALOG_RELATIONSHIP));
+
+            if (catalog.getConnector() != null) {
+                catalog.getConnector().connectTrinoSchema(this, 
catalog.getHookInstanceName(), catalog.getName(), schema, entity, ret);
+            }
+
+            ret.setEntity(entity);
+
+            updateEntity(ret);
+        }
+
+        return ret;
+    }
+
+    public AtlasEntityWithExtInfo createOrUpdateTableEntity(Catalog catalog, 
String schema, String table, Map<String, Object> tableMetadata, Map<String, 
Map<String, Object>> trinoColumns, AtlasEntity schemaEntity) throws 
AtlasServiceException {
+        String qualifiedName = String.format("%s.%s.%s@%s", catalog.getName(), 
schema, table, catalog.getInstanceName());
+
+        AtlasEntityWithExtInfo ret;
+        AtlasEntityWithExtInfo tableEntityExt = findEntity(TRINO_TABLE, 
qualifiedName, true, true);
+
+        if (tableEntityExt == null) {
+            tableEntityExt = toTableEntity(catalog, schema, table, 
tableMetadata, trinoColumns, schemaEntity, tableEntityExt);
+            ret            = createEntity(tableEntityExt);
+        } else {
+            ret = toTableEntity(catalog, schema, table, tableMetadata, 
trinoColumns, schemaEntity, tableEntityExt);
+
+            updateEntity(ret);
+        }
+
+        return ret;
+    }
+
+    public void deleteByGuid(Set<String> guidTodelete) throws 
AtlasServiceException {
+        if (CollectionUtils.isNotEmpty(guidTodelete)) {
+            for (String guid : guidTodelete) {
+                EntityMutationResponse response = 
atlasClientV2.deleteEntityByGuid(guid);
+
+                if (response == null || 
response.getDeletedEntities().isEmpty()) {
+                    LOG.debug("Entity with guid : {} is not deleted", guid);
+                } else {
+                    LOG.debug("Entity with guid : {} is deleted", guid);
+                }
+            }
+        }
+    }
+
+    public AtlasEntityWithExtInfo findEntity(final String typeName, final 
String qualifiedName, boolean minExtInfo, boolean ignoreRelationship) throws 
AtlasServiceException {
+        try {
+            return atlasClientV2.getEntityByAttribute(typeName, 
Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName), minExtInfo, 
ignoreRelationship);
+        } catch (AtlasServiceException e) {
+            if (e.getStatus() == ClientResponse.Status.NOT_FOUND) {
+                return null;
+            }
+
+            throw e;
+        }
+    }
+
+    public void close() {
+        if (atlasClientV2 != null) {
+            atlasClientV2.close();
+
+            atlasClientV2 = null;
+        }
+    }
+
+
+    private synchronized AtlasClientV2 getAtlasClientV2Instance(Configuration 
atlasConf) throws IOException {
+        if (atlasClientV2 == null) {
+            String[] atlasEndpoint = new String[] {DEFAULT_ATLAS_URL};
+
+            if (atlasConf != null && 
ArrayUtils.isNotEmpty(atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT)))
 {
+                atlasEndpoint = 
atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT);
+            }
+
+            if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+                String[] basicAuthUsernamePassword = 
AuthenticationUtil.getBasicAuthenticationInput();
+
+                atlasClientV2 = new AtlasClientV2(atlasEndpoint, 
basicAuthUsernamePassword);
+            } else {
+                UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
+
+                atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), 
atlasEndpoint);
+            }
+        }
+
+        return atlasClientV2;
+    }
+
+    private List<AtlasEntityHeader> getAllRelationshipEntities(String 
entityGuid, String relationshipAttributeName) throws AtlasServiceException {
+        List<AtlasEntityHeader> entities = new ArrayList<>();
+
+        if (entityGuid != null) {
+            final int pageSize = DEFAULT_PAGE_LIMIT;
+
+            for (int i = 0; ; i++) {
+                int offset = pageSize * i;
+
+                LOG.debug("Retrieving: offset={}, pageSize={}", offset, 
pageSize);
+
+                AtlasSearchResult       searchResult  = 
atlasClientV2.relationshipSearch(entityGuid, relationshipAttributeName, null, 
null, true, pageSize, offset);
+                List<AtlasEntityHeader> entityHeaders = searchResult == null ? 
null : searchResult.getEntities();
+                int                     count         = entityHeaders == null 
? 0 : entityHeaders.size();
+
+                if (count > 0) {
+                    entities.addAll(entityHeaders);
+                }
+
+                if (count < pageSize) { // last page
+                    break;
+                }
+            }
+        }
+
+        return entities;
+    }
+
+    private AtlasEntityWithExtInfo toTableEntity(Catalog catalog, String 
schema, String table, Map<String, Object> tableMetadata, Map<String, 
Map<String, Object>> trinoColumns, AtlasEntity schemaEntity, 
AtlasEntityWithExtInfo tableEntityExt)  {
+        if (tableEntityExt == null) {
+            tableEntityExt = new AtlasEntityWithExtInfo(new 
AtlasEntity(TRINO_TABLE));
+        }
+
+        AtlasEntity       tableEntity    = tableEntityExt.getEntity();
+        List<AtlasEntity> columnEntities = new ArrayList<>();
+        String            qualifiedName  = String.format("%s.%s.%s@%s", 
catalog.getName(), schema, table, catalog.getInstanceName());
+
+        tableEntity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+        tableEntity.setAttribute(NAME_ATTRIBUTE, table);
+        tableEntity.setAttribute(TRINO_TABLE_TYPE, 
tableMetadata.get(TRINO_TABLE_TYPE).toString());
+
+        for (Map.Entry<String, Map<String, Object>> columnEntry : 
trinoColumns.entrySet()) {
+            AtlasEntity entity           = new AtlasEntity(TRINO_COLUMN);
+            String      columnName       = columnEntry.getKey();
+            String      colQualifiedName = String.format("%s.%s.%s.%s@%s", 
catalog.getName(), schema, table, columnName, catalog.getInstanceName());
+
+            entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, colQualifiedName);
+            entity.setAttribute(NAME_ATTRIBUTE, columnName);
+
+            if (MapUtils.isNotEmpty(columnEntry.getValue())) {
+                Map<String, Object> columnAttr = columnEntry.getValue();
+
+                entity.setAttribute(TRINO_COLUMN_DATA_TYPE_ATTRIBUTE, 
columnAttr.get(TRINO_COLUMN_DATA_TYPE_ATTRIBUTE));
+                entity.setAttribute(TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE, 
columnAttr.get(TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE));
+                entity.setAttribute(TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE, 
columnAttr.get(TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE));
+                entity.setAttribute(TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE, 
columnAttr.get(TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE));
+            }
+
+            entity.setRelationshipAttribute(TRINO_COLUMN_TABLE_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(tableEntity, 
TRINO_TABLE_COLUMN_RELATIONSHIP));
+
+            columnEntities.add(entity);
+        }
+
+        tableEntity.setRelationshipAttribute(TRINO_TABLE_SCHEMA_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(schemaEntity, 
TRINO_TABLE_SCHEMA_RELATIONSHIP));
+        tableEntity.setRelationshipAttribute(TRINO_TABLE_COLUMN_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectIds(columnEntities, 
TRINO_TABLE_COLUMN_RELATIONSHIP));
+
+        if (catalog.getConnector() != null) {
+            catalog.getConnector().connectTrinoTable(this, 
catalog.getHookInstanceName(), catalog.getName(), schema, table, tableEntity, 
columnEntities, tableEntityExt);
+        }
+
+        tableEntityExt.addReferredEntity(schemaEntity);
+
+        for (AtlasEntity column : columnEntities) {
+            tableEntityExt.addReferredEntity(column);
+        }
+
+        return tableEntityExt;
+    }
+
+    private AtlasEntityWithExtInfo createEntity(AtlasEntityWithExtInfo entity) 
throws AtlasServiceException {
+        LOG.debug("creating {} entity: {}", entity.getEntity().getTypeName(), 
entity);
+
+        AtlasEntityWithExtInfo  ret             = null;
+        EntityMutationResponse  response        = 
atlasClientV2.createEntity(entity);
+        List<AtlasEntityHeader> createdEntities = 
response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
+
+        if (CollectionUtils.isNotEmpty(createdEntities)) {
+            for (AtlasEntityHeader createdEntity : createdEntities) {
+                if (ret == null) {
+                    ret = 
atlasClientV2.getEntityByGuid(createdEntity.getGuid());
+
+                    LOG.debug("Created {} entity: name={}, guid={}", 
ret.getEntity().getTypeName(), 
ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), 
ret.getEntity().getGuid());
+                } else if (ret.getEntity(createdEntity.getGuid()) == null) {
+                    AtlasEntityWithExtInfo newEntity = 
atlasClientV2.getEntityByGuid(createdEntity.getGuid());
+
+                    ret.addReferredEntity(newEntity.getEntity());
+
+                    if (MapUtils.isNotEmpty(newEntity.getReferredEntities())) {
+                        for (Map.Entry<String, AtlasEntity> entry : 
newEntity.getReferredEntities().entrySet()) {
+                            ret.addReferredEntity(entry.getKey(), 
entry.getValue());
+                        }
+                    }
+
+                    LOG.debug("Created {} entity: name={}, guid={}", 
newEntity.getEntity().getTypeName(), 
newEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), 
newEntity.getEntity().getGuid());
+                }
+            }
+        }
+
+        clearRelationshipAttributes(ret);
+
+        return ret;
+    }
+
+    private void updateEntity(AtlasEntityWithExtInfo entity) throws 
AtlasServiceException {
+        LOG.debug("updating {} entity: {}", entity.getEntity().getTypeName(), 
entity);
+
+        atlasClientV2.updateEntity(entity);
+
+        LOG.debug("Updated {} entity: name={}, guid={}", 
entity.getEntity().getTypeName(), 
entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), 
entity.getEntity().getGuid());
+    }
+
+    private void clearRelationshipAttributes(AtlasEntityWithExtInfo entity) {
+        if (entity != null) {
+            clearRelationshipAttributes(entity.getEntity());
+
+            if (entity.getReferredEntities() != null) {
+                
clearRelationshipAttributes(entity.getReferredEntities().values());
+            }
+        }
+    }
+
+    private void clearRelationshipAttributes(Collection<AtlasEntity> entities) 
{
+        if (entities != null) {
+            for (AtlasEntity entity : entities) {
+                clearRelationshipAttributes(entity);
+            }
+        }
+    }
+
+    private void clearRelationshipAttributes(AtlasEntity entity) {
+        if (entity != null && entity.getRelationshipAttributes() != null) {
+            entity.getRelationshipAttributes().clear();
+        }
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java
new file mode 100644
index 000000000..2ef3f1499
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.client;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TrinoClientHelper {
+    private final String jdbcUrl;
+    private final String username;
+    private final String password;
+
+    public TrinoClientHelper(Configuration atlasConf) {
+        this.jdbcUrl  = atlasConf.getString("atlas.trino.jdbc.address");
+        this.username = atlasConf.getString("atlas.trino.jdbc.user");
+        this.password = atlasConf.getString("atlas.trino.jdbc.password", "");
+    }
+
+    public Map<String, String> getAllTrinoCatalogs() {
+        Map<String, String> catalogs = new HashMap<>();
+
+        try (Connection connection = getTrinoConnection();
+             Statement  stmt       = connection.createStatement()) {
+            String     query = "SELECT catalog_name, connector_name FROM 
system.metadata.catalogs";
+            ResultSet  rs    = stmt.executeQuery(query);
+
+            while (rs.next()) {
+                catalogs.put(rs.getString("catalog_name"), 
rs.getString("connector_name"));
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+
+        return catalogs;
+    }
+
+    public List<String> getTrinoSchemas(String catalog, String schemaToImport) 
throws SQLException {
+        List<String>  schemas    = new ArrayList<>();
+
+        try (Connection connection = getTrinoConnection();
+             Statement  stmt       = connection.createStatement()) {
+            StringBuilder query = new StringBuilder();
+
+            query.append("SELECT schema_name FROM 
").append(catalog).append(".information_schema.schemata");
+
+            if (StringUtils.isNotEmpty(schemaToImport)) {
+                query.append(" where schema_name = 
'").append(schemaToImport).append("'");
+            }
+
+            ResultSet rs = stmt.executeQuery(query.toString());
+
+            while (rs.next()) {
+                schemas.add(rs.getString("schema_name"));
+            }
+        }
+
+        return schemas;
+    }
+
+    public Map<String, Map<String, Object>> getTrinoTables(String catalog, 
String schema, String tableToImport) throws SQLException {
+        Map<String, Map<String, Object>> tables     = new HashMap<>();
+
+        try (Connection connection = getTrinoConnection();
+             Statement  stmt       = connection.createStatement()) {
+            StringBuilder query = new StringBuilder();
+
+            query.append("SELECT table_name, table_type FROM 
").append(catalog).append(".information_schema.tables WHERE table_schema = 
'").append(schema).append("'");
+
+            if (StringUtils.isNotEmpty(tableToImport)) {
+                query.append(" and table_name = 
'").append(tableToImport).append("'");
+            }
+
+            ResultSet rs = stmt.executeQuery(query.toString());
+
+            while (rs.next()) {
+                Map<String, Object> tableMetadata = new HashMap<>();
+
+                tableMetadata.put("table_name", rs.getString("table_name"));
+                tableMetadata.put("table_type", rs.getString("table_type"));
+
+                tables.put(rs.getString("table_name"), tableMetadata);
+            }
+        }
+
+        return tables;
+    }
+
+    public Map<String, Map<String, Object>> getTrinoColumns(String catalog, 
String schema, String table) throws SQLException {
+        Map<String, Map<String, Object>> columns = new HashMap<>();
+
+        try (Connection connection = getTrinoConnection();
+             Statement  stmt       = connection.createStatement()) {
+            StringBuilder query = new StringBuilder();
+
+            query.append("SELECT column_name, ordinal_position, 
column_default, is_nullable, data_type FROM 
").append(catalog).append(".information_schema.columns WHERE table_schema = 
'").append(schema).append("' AND table_name = '").append(table).append("'");
+
+            ResultSet rs = stmt.executeQuery(query.toString());
+
+            while (rs.next()) {
+                Map<String, Object> columnMetadata = new HashMap<>();
+
+                columnMetadata.put("ordinal_position", 
rs.getInt("ordinal_position"));
+                columnMetadata.put("column_default", 
rs.getString("column_default"));
+                columnMetadata.put("column_name", rs.getString("column_name"));
+
+                if (StringUtils.isNotEmpty(rs.getString("is_nullable"))) {
+                    if 
(StringUtils.equalsIgnoreCase(rs.getString("is_nullable"), "YES")) {
+                        columnMetadata.put("is_nullable", true);
+                    } else {
+                        columnMetadata.put("is_nullable", false);
+                    }
+                }
+
+                columnMetadata.put("data_type", rs.getString("data_type"));
+
+                columns.put(rs.getString("column_name"), columnMetadata);
+            }
+        }
+
+        return columns;
+    }
+
+    private Connection getTrinoConnection() throws SQLException {
+        return DriverManager.getConnection(jdbcUrl, username, password);
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java
new file mode 100644
index 000000000..eec4a6736
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+
+import java.util.List;
+
+public abstract class AtlasEntityConnector {
+    public abstract void connectTrinoCatalog(AtlasClientHelper atlasClient, 
String instanceName, String catalogName, AtlasEntity entity, 
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo);
+
+    public abstract void connectTrinoSchema(AtlasClientHelper atlasClient, 
String instanceName, String catalogName, String schemaName, AtlasEntity entity, 
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo);
+
+    public abstract void connectTrinoTable(AtlasClientHelper atlasClient, 
String instanceName, String catalogName, String schemaName, String tableName, 
AtlasEntity entity, List<AtlasEntity> columnEntities, 
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo);
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java
new file mode 100644
index 000000000..f20205e29
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectorFactory {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConnectorFactory.class);
+
+    public static AtlasEntityConnector getConnector(String connectorType) {
+        switch (connectorType.toLowerCase()) {
+            case "iceberg":
+                return new IcebergEntityConnector();
+            case "hive":
+                return new HiveEntityConnector();
+            default:
+                LOG.warn("{}: unrecognized connector type", connectorType);
+
+                return null;
+        }
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java
new file mode 100644
index 000000000..df54d05d4
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class HiveEntityConnector extends AtlasEntityConnector {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HiveEntityConnector.class);
+
+    public static final String HIVE_DB                               = 
"hive_db";
+    public static final String HIVE_TABLE                            = 
"hive_table";
+    public static final String HIVE_COLUMN                           = 
"hive_column";
+    public static final String TRINO_SCHEMA_HIVE_DB_RELATIONSHIP     = 
"trino_schema_hive_db";
+    public static final String TRINO_TABLE_HIVE_TABLE_RELATIONSHIP   = 
"trino_table_hive_table";
+    public static final String TRINO_COLUMN_HIVE_COLUMN_RELATIONSHIP = 
"trino_column_hive_column";
+    public static final String TRINO_SCHEMA_HIVE_DB_ATTRIBUTE        = 
"hive_db";
+    public static final String TRINO_TABLE_HIVE_TABLE_ATTRIBUTE      = 
"hive_table";
+    public static final String TRINO_COLUMN_HIVE_COLUMN_ATTRIBUTE    = 
"hive_column";
+
+    @Override
+    public void connectTrinoCatalog(AtlasClientHelper atlasClient, String 
instanceName, String catalogName, AtlasEntity entity, AtlasEntityWithExtInfo 
entityWithExtInfo) {
+    }
+
+    @Override
+    public void connectTrinoSchema(AtlasClientHelper atlasClient, String 
instanceName, String catalogName, String schemaName, AtlasEntity dbEntity, 
AtlasEntityWithExtInfo entityWithExtInfo) {
+        if (instanceName == null) {
+            LOG.warn("Failed attempting to connect entity since hook namespace 
is empty, Please configure in properties");
+        } else {
+            try {
+                AtlasEntity hiveDb = toDbEntity(atlasClient, instanceName, 
schemaName);
+
+                if (hiveDb != null) {
+                    
dbEntity.setRelationshipAttribute(TRINO_SCHEMA_HIVE_DB_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(hiveDb, 
TRINO_SCHEMA_HIVE_DB_RELATIONSHIP));
+                }
+            } catch (AtlasServiceException e) {
+                LOG.error("Error encountered: ", e);
+            }
+        }
+    }
+
+    @Override
+    public void connectTrinoTable(AtlasClientHelper atlasClient, String 
instanceName, String catalogName, String schemaName, String tableName, 
AtlasEntity trinoTable, List<AtlasEntity> columnEntities, 
AtlasEntityWithExtInfo entityWithExtInfo) {
+        if (instanceName == null) {
+            LOG.warn("Failed attempting to connect entity since hook namespace 
is empty, Please configure in properties");
+        } else {
+            try {
+                AtlasEntity hiveTable = toTableEntity(atlasClient, 
instanceName, schemaName, tableName);
+
+                if (hiveTable != null) {
+                    
trinoTable.setRelationshipAttribute(TRINO_TABLE_HIVE_TABLE_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(hiveTable, 
TRINO_TABLE_HIVE_TABLE_RELATIONSHIP));
+
+                    for (AtlasEntity columnEntity : columnEntities) {
+                        connectTrinoColumn(atlasClient, instanceName, 
schemaName, tableName, columnEntity);
+                    }
+                }
+            } catch (AtlasServiceException e) {
+                LOG.error("Error encountered: ", e);
+            }
+        }
+    }
+
+    private void connectTrinoColumn(AtlasClientHelper atlasClient, String 
instanceName, String schemaName, String tableName, AtlasEntity trinoColumn) 
throws AtlasServiceException {
+        if (instanceName != null) {
+            try {
+                AtlasEntity hiveColumn = toColumnEntity(atlasClient, 
instanceName, schemaName, tableName, 
trinoColumn.getAttribute("name").toString());
+
+                if (hiveColumn != null) {
+                    
trinoColumn.setRelationshipAttribute(TRINO_COLUMN_HIVE_COLUMN_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(hiveColumn, 
TRINO_COLUMN_HIVE_COLUMN_RELATIONSHIP));
+                }
+            } catch (AtlasServiceException e) {
+                throw new AtlasServiceException(e);
+            }
+        }
+    }
+
+    private AtlasEntity toDbEntity(AtlasClientHelper atlasClient, String 
instanceName, String schemaName) throws AtlasServiceException {
+        String                 dbQualifiedName = schemaName + "@" + 
instanceName;
+        AtlasEntityWithExtInfo ret             = 
atlasClient.findEntity(HIVE_DB, dbQualifiedName, true, true);
+
+        return ret != null ? ret.getEntity() : null;
+    }
+
+    private AtlasEntity toTableEntity(AtlasClientHelper atlasClient, String 
instanceName, String schemaName, String tableName) throws AtlasServiceException 
{
+        String                 tableQualifiedName = schemaName + "." + 
tableName + "@" + instanceName;
+        AtlasEntityWithExtInfo ret                = 
atlasClient.findEntity(HIVE_TABLE, tableQualifiedName, true, true);
+
+        return ret != null ? ret.getEntity() : null;
+    }
+
+    private AtlasEntity toColumnEntity(AtlasClientHelper atlasClient, String 
instanceName, String schemaName, String tableName, String columnName) throws 
AtlasServiceException {
+        String                 columnQualifiedName = schemaName + "." + 
tableName + "." + columnName + "@" + instanceName;
+        AtlasEntityWithExtInfo ret                 = 
atlasClient.findEntity(HIVE_COLUMN, columnQualifiedName, true, true);
+
+        return ret != null ? ret.getEntity() : null;
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java
new file mode 100644
index 000000000..677c7834b
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class IcebergEntityConnector extends AtlasEntityConnector {
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergEntityConnector.class);
+
+    public static final String HIVE_DB                                  = 
"hive_db";
+    public static final String ICEBERG_TABLE                            = 
"iceberg_table";
+    public static final String ICEBERG_COLUMN                           = 
"iceberg_column";
+    public static final String TRINO_SCHEMA_HIVE_DB_RELATIONSHIP        = 
"trino_schema_hive_db";
+    public static final String TRINO_TABLE_ICEBERG_TABLE_RELATIONSHIP   = 
"trino_table_iceberg_table";
+    public static final String TRINO_COLUMN_ICEBERG_COLUMN_RELATIONSHIP = 
"trino_column_iceberg_column";
+    public static final String TRINO_SCHEMA_HIVE_DB_ATTRIBUTE           = 
"hive_db";
+    public static final String TRINO_TABLE_ICEBERG_TABLE_ATTRIBUTE      = 
"iceberg_table";
+    public static final String TRINO_COLUMN_ICEBERG_COLUMN_ATTRIBUTE    = 
"iceberg_column";
+
+    @Override
+    public void connectTrinoCatalog(AtlasClientHelper atlasClient, String 
instanceName, String catalogName, AtlasEntity entity, AtlasEntityWithExtInfo 
entityWithExtInfo) {
+    }
+
+    @Override
+    public void connectTrinoSchema(AtlasClientHelper atlasClient, String 
instanceName, String catalogName, String schemaName, AtlasEntity dbEntity, 
AtlasEntityWithExtInfo entityWithExtInfo) {
+        if (instanceName == null) {
+            LOG.warn("Failed attempting to connect entity since hook namespace 
is empty, Please configure in properties");
+        } else {
+            try {
+                AtlasEntity hiveDb = toDbEntity(atlasClient, instanceName, 
schemaName);
+
+                if (hiveDb != null) {
+                    
dbEntity.setRelationshipAttribute(TRINO_SCHEMA_HIVE_DB_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(hiveDb, 
TRINO_SCHEMA_HIVE_DB_RELATIONSHIP));
+                }
+            } catch (AtlasServiceException e) {
+                LOG.error("Error encountered: ", e);
+            }
+        }
+    }
+
+    @Override
+    public void connectTrinoTable(AtlasClientHelper atlasClient, String 
instanceName, String catalogName, String schemaName, String tableName, 
AtlasEntity trinoTable, List<AtlasEntity> columnEntities, 
AtlasEntityWithExtInfo entityWithExtInfo) {
+        if (instanceName == null) {
+            LOG.warn("Failed attempting to connect entity since hook namespace 
is empty, Please configure in properties");
+        } else {
+            try {
+                AtlasEntity icebergTable = toTableEntity(atlasClient, 
instanceName, schemaName, tableName);
+
+                if (icebergTable != null) {
+                    
trinoTable.setRelationshipAttribute(TRINO_TABLE_ICEBERG_TABLE_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(icebergTable, 
TRINO_TABLE_ICEBERG_TABLE_RELATIONSHIP));
+
+                    for (AtlasEntity columnEntity : columnEntities) {
+                        connectTrinoColumn(atlasClient, instanceName, 
schemaName, tableName, columnEntity);
+                    }
+                }
+            } catch (AtlasServiceException e) {
+                LOG.error("Error encountered: ", e);
+            }
+        }
+    }
+
+    private void connectTrinoColumn(AtlasClientHelper atlasClient, String 
instanceName, String schemaName, String tableName, AtlasEntity trinoColumn) 
throws AtlasServiceException {
+        if (instanceName != null) {
+            try {
+                AtlasEntity icebergColumn = toColumnEntity(atlasClient, 
instanceName, schemaName, tableName, 
trinoColumn.getAttribute("name").toString());
+
+                if (icebergColumn != null) {
+                    
trinoColumn.setRelationshipAttribute(TRINO_COLUMN_ICEBERG_COLUMN_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(icebergColumn, 
TRINO_COLUMN_ICEBERG_COLUMN_RELATIONSHIP));
+                }
+            } catch (AtlasServiceException e) {
+                throw new AtlasServiceException(e);
+            }
+        }
+    }
+
+    private AtlasEntity toDbEntity(AtlasClientHelper atlasClient, String 
instanceName, String schemaName) throws AtlasServiceException {
+        String                 dbQualifiedName = schemaName + "@" + 
instanceName;
+        AtlasEntityWithExtInfo ret             = 
atlasClient.findEntity(HIVE_DB, dbQualifiedName, true, true);
+
+        return ret != null ? ret.getEntity() : null;
+    }
+
+    private AtlasEntity toTableEntity(AtlasClientHelper atlasClient, String 
instanceName, String schemaName, String tableName) throws AtlasServiceException 
{
+        String                 tableQualifiedName = schemaName + "." + 
tableName + "@" + instanceName;
+        AtlasEntityWithExtInfo ret                = 
atlasClient.findEntity(ICEBERG_TABLE, tableQualifiedName, true, true);
+
+        return ret != null ? ret.getEntity() : null;
+    }
+
+    private AtlasEntity toColumnEntity(AtlasClientHelper atlasClient, String 
instanceName, String schemaName, String tableName, String columnName) throws 
AtlasServiceException {
+        String                 columnQualifiedName = schemaName + "." + 
tableName + "." + columnName + "@" + instanceName;
+        AtlasEntityWithExtInfo ret                 = 
atlasClient.findEntity(ICEBERG_COLUMN, columnQualifiedName, true, true);
+
+        return ret != null ? ret.getEntity() : null;
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java
new file mode 100644
index 000000000..06bba2f48
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.model;
+
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.trino.connector.AtlasEntityConnector;
+import org.apache.atlas.trino.connector.ConnectorFactory;
+
+public class Catalog {
+    private final String                 name;
+    private final String                 type;
+    private final String                 hookInstanceName;
+    private final String                 instanceName;
+    private final AtlasEntityConnector   connector;
+    private       AtlasEntityWithExtInfo trinoInstanceEntity;
+    private       String                 schemaToImport;
+    private       String                 tableToImport;
+
+    public Catalog(String name, String type, boolean hookEnabled, String 
hookInstanceName, String instanceName) {
+        this.name             = name;
+        this.type             = type;
+        this.hookInstanceName = hookInstanceName;
+        this.instanceName     = instanceName;
+        this.connector        = hookEnabled ? 
ConnectorFactory.getConnector(type) : null;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public AtlasEntityConnector getConnector() {
+        return connector;
+    }
+
+    public String getHookInstanceName() {
+        return hookInstanceName;
+    }
+
+    public String getInstanceName() {
+        return instanceName;
+    }
+
+    public AtlasEntityWithExtInfo getTrinoInstanceEntity() {
+        return trinoInstanceEntity;
+    }
+
+    public void setTrinoInstanceEntity(AtlasEntityWithExtInfo 
trinoInstanceEntity) {
+        this.trinoInstanceEntity = trinoInstanceEntity;
+    }
+
+    public String getTableToImport() {
+        return tableToImport;
+    }
+
+    public void setTableToImport(String tableToImport) {
+        this.tableToImport = tableToImport;
+    }
+
+    public String getSchemaToImport() {
+        return schemaToImport;
+    }
+
+    public void setSchemaToImport(String schemaToImport) {
+        this.schemaToImport = schemaToImport;
+    }
+}
diff --git a/addons/trino-extractor/src/main/resources/atlas-logback.xml 
b/addons/trino-extractor/src/main/resources/atlas-logback.xml
new file mode 100644
index 000000000..2a616c524
--- /dev/null
+++ b/addons/trino-extractor/src/main/resources/atlas-logback.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <encoder>
+            <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+    </appender>
+
+    <appender name="FILE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${atlas.log.dir}/${atlas.log.file}</file>
+        <append>true</append>
+        <encoder>
+            <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+        </encoder>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            
<fileNamePattern>${atlas.log.dir}/${atlas.log.file}-%d</fileNamePattern>
+            <maxHistory>20</maxHistory>
+            <cleanHistoryOnStart>true</cleanHistoryOnStart>
+        </rollingPolicy>
+    </appender>
+
+    <logger name="org.apache.atlas.trino" additivity="false" level="info">
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <root level="warn">
+        <appender-ref ref="FILE"/>
+    </root>
+</configuration>
diff --git 
a/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java
 
b/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java
new file mode 100644
index 000000000..7e94f381b
--- /dev/null
+++ 
b/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+public class TrinoExtractorIT {
+
+ /* List of testcases
+    Invalid Arguments
+    Invalid cron expression
+    Test valid Catalog to be run
+    Test Instance creation
+    Test catalog creation
+    Test schema creation
+    Test table creation
+    Test of hook is enabled, hook entity if created, is connected to Trino 
entity
+    Test cron doesn't trigger new job, before earlier thread completes
+    Test without cron expression
+    Test even if catalog is not registered, it should run if passed from 
commandLine
+    Deleted table
+    Deleted catalog
+    Deleted column
+    Deleted schema
+    Rename catalog
+    Rename schema
+    Tag propagated*/
+
+}
diff --git a/distro/pom.xml b/distro/pom.xml
index af0d01060..d7e958889 100644
--- a/distro/pom.xml
+++ b/distro/pom.xml
@@ -276,6 +276,7 @@ atlas.graph.storage.hbase.regions-per-server=1
                                         
<!--<descriptor>src/main/assemblies/migration-exporter.xml</descriptor>-->
                                         
<descriptor>src/main/assemblies/classification-updater.xml</descriptor>
                                         
<descriptor>src/main/assemblies/notification-analyzer.xml</descriptor>
+                                        
<descriptor>src/main/assemblies/atlas-trino-extractor.xml</descriptor>
                                     </descriptors>
                                     
<finalName>apache-atlas-${project.version}</finalName>
                                     <tarLongFileMode>gnu</tarLongFileMode>
diff --git a/distro/src/main/assemblies/atlas-trino-extractor.xml 
b/distro/src/main/assemblies/atlas-trino-extractor.xml
new file mode 100644
index 000000000..923c8dc95
--- /dev/null
+++ b/distro/src/main/assemblies/atlas-trino-extractor.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
+          
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
+    <formats>
+        <format>tar.gz</format>
+    </formats>
+    <id>trino-extractor</id>
+    
<baseDirectory>apache-atlas-trino-extractor-${project.version}</baseDirectory>
+        <fileSets>
+            <fileSet>
+                <includes>
+                    <include>README*</include>
+                </includes>
+            </fileSet>
+            <fileSet>
+                <directory>../addons/trino-extractor/src/main/conf</directory>
+                <outputDirectory>/conf</outputDirectory>
+                <includes>
+                    <include>atlas-log4j.xml</include>
+                    <include>atlas-application.properties</include>
+                </includes>
+                <fileMode>0755</fileMode>
+                <directoryMode>0755</directoryMode>
+            </fileSet>
+            <fileSet>
+                <directory>../addons/trino-extractor/src/main/bin</directory>
+                <outputDirectory>/bin</outputDirectory>
+                <includes>
+                    <include>run-trino-extractor.sh</include>
+                </includes>
+                <fileMode>0755</fileMode>
+                <directoryMode>0755</directoryMode>
+            </fileSet>
+            <fileSet>
+                
<directory>../addons/trino-extractor/target/dependency/trino</directory>
+                <outputDirectory>/lib</outputDirectory>
+            </fileSet>
+            <fileSet>
+                <directory>../addons/trino-extractor/target</directory>
+                <outputDirectory>/lib</outputDirectory>
+                <includes>
+                    <include>atlas-trino-extractor-*.jar</include>
+                </includes>
+            </fileSet>
+        </fileSets>
+
+</assembly>
diff --git a/pom.xml b/pom.xml
index 9341140d4..ac2d4df6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
         <module>addons/sqoop-bridge-shim</module>
         <module>addons/storm-bridge</module>
         <module>addons/storm-bridge-shim</module>
+        <module>addons/trino-extractor</module>
         <module>atlas-examples</module>
         <module>authorization</module>
         <module>build-tools</module>

Reply via email to