This is an automated email from the ASF dual-hosted git repository.

pinal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c49d3933 ATLAS-5021: Extract Metadata from Trino periodically (#428)
4c49d3933 is described below

commit 4c49d393382a655d179703b97758aabcffb0d5bf
Author: Pinal Shah <[email protected]>
AuthorDate: Tue Sep 9 12:15:17 2025 +0530

    ATLAS-5021: Extract Metadata from Trino periodically (#428)
---
 addons/models/6000-Trino/6000-trino_model.json     | 185 +++++++++
 addons/trino-extractor/pom.xml                     |  96 +++++
 .../src/main/bin/run-trino-extractor.sh            | 137 +++++++
 .../main/conf/atlas-trino-extractor-logback.xml    |  56 +++
 .../src/main/conf/atlas-trino-extractor.properties |  43 ++
 .../apache/atlas/trino/cli/ExtractorContext.java   | 123 ++++++
 .../apache/atlas/trino/cli/ExtractorService.java   | 349 +++++++++++++++++
 .../org/apache/atlas/trino/cli/TrinoExtractor.java | 229 +++++++++++
 .../atlas/trino/client/AtlasClientHelper.java      | 435 +++++++++++++++++++++
 .../atlas/trino/client/TrinoClientHelper.java      | 151 +++++++
 .../trino/connector/AtlasEntityConnector.java      |  31 ++
 .../atlas/trino/connector/ConnectorFactory.java    |  42 ++
 .../atlas/trino/connector/HiveEntityConnector.java | 118 ++++++
 .../trino/connector/IcebergEntityConnector.java    | 118 ++++++
 .../java/org/apache/atlas/trino/model/Catalog.java |  85 ++++
 .../apache/atlas/trino/cli/TrinoExtractorIT.java   |  40 ++
 distro/pom.xml                                     |   1 +
 .../src/main/assemblies/atlas-trino-extractor.xml  |  65 +++
 docs/src/documents/Tools/TrinoExtractor.md         | 149 +++++++
 .../graphdb/janus/AtlasJanusGraphManagement.java   |   2 +-
 pom.xml                                            |   1 +
 .../repository/graph/GraphBackedSearchIndexer.java |   1 -
 .../repository/patches/UniqueAttributePatch.java   |   1 -
 23 files changed, 2455 insertions(+), 3 deletions(-)

diff --git a/addons/models/6000-Trino/6000-trino_model.json 
b/addons/models/6000-Trino/6000-trino_model.json
new file mode 100644
index 000000000..734038530
--- /dev/null
+++ b/addons/models/6000-Trino/6000-trino_model.json
@@ -0,0 +1,185 @@
+{
+  "entityDefs": [
+    {
+      "name":         "trino_instance",
+      "superTypes":   [ "DataSet" ],
+      "serviceType":  "trino",
+      "typeVersion":  "1.0",
+      "attributeDefs": [
+        { "name": "hostname", "typeName": "string", "isOptional": true, 
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false },
+        { "name": "port",     "typeName": "int",    "isOptional": true, 
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false }
+      ]
+    },
+    {
+      "name":           "trino_catalog",
+      "superTypes":     [ "Asset" ],
+      "serviceType":    "trino",
+      "typeVersion":    "1.0",
+      "attributeDefs":  [
+        { "name": "connectorType", "typeName": "string", "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false },
+        { "name": "connectionUrl", "typeName": "string", "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true }
+      ]
+    },
+    {
+      "name":         "trino_schema",
+      "superTypes":   [ "Asset" ],
+      "serviceType":  "trino",
+      "typeVersion":  "1.0",
+      "attributeDefs": [
+        { "name": "parameters", "typeName": "map<string,string>", 
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": 
true }
+      ]
+    },
+    {
+      "name":         "trino_table",
+      "superTypes":   [ "DataSet" ],
+      "serviceType":  "trino",
+      "typeVersion":  "1.0",
+      "attributeDefs": [
+        { "name": "comment",    "typeName": "string",             
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": 
true },
+        { "name": "parameters", "typeName": "map<string,string>", 
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": 
true },
+        { "name": "table_type", "typeName": "string",             
"cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": 
true }
+      ]
+    },
+    {
+      "name":         "trino_column",
+      "superTypes":   [ "DataSet" ],
+      "serviceType":  "trino",
+      "typeVersion":  "1.0",
+      "attributeDefs": [
+        { "name": "data_type",        "typeName": "string",  "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": true,  "isOptional": true },
+        { "name": "ordinal_position", "typeName": "int",     "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true },
+        { "name": "column_default",   "typeName": "string",  "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true },
+        { "name": "comment",          "typeName": "string",  "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true },
+        { "name": "is_nullable",      "typeName": "boolean", "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true },
+        { "name": "isPrimaryKey",     "typeName": "boolean", "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true }
+      ]
+    },
+    {
+      "name":         "trino_column_lineage",
+      "superTypes":   [ "Process" ],
+      "serviceType":  "trino",
+      "typeVersion":  "1.0",
+      "attributeDefs": [
+        { "name": "expression", "typeName": "string", "cardinality": "SINGLE", 
"isUnique": false, "isIndexable": false, "isOptional": true }
+      ]
+    },
+    {
+      "name":         "trino_process",
+      "superTypes":   [ "Process" ],
+      "serviceType":  "trino",
+      "typeVersion":  "1.0",
+      "attributeDefs": [
+        { "name": "queryId",         "typeName": "string", "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 },
+        { "name": "queryCreateTime", "typeName": "long",   "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 },
+        { "name": "queryEndTime",    "typeName": "long",   "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 },
+        { "name": "queryText",       "typeName": "string", "cardinality": 
"SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 }
+      ]
+    },
+    {
+      "name":           "trino_schema_ddl",
+      "superTypes":     [ "ddl" ],
+      "serviceType":    "trino",
+      "typeVersion":    "1.0",
+      "attributeDefs":  [ ]
+    },
+    {
+      "name":           "trino_table_ddl",
+      "superTypes":     [ "ddl" ],
+      "serviceType":    "trino",
+      "typeVersion":    "1.0",
+      "attributeDefs":  [ ]
+    }
+  ],
+  "relationshipDefs": [
+    {
+      "name":                 "trino_instance_catalog",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "COMPOSITION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_catalog",  "name": "instance", 
"isContainer": false, "cardinality": "SINGLE", "isLegacyAttribute": false },
+      "endDef2": { "type": "trino_instance", "name": "catalogs", 
"isContainer": true,  "cardinality": "SET" }
+    },
+    {
+      "name":                 "trino_schema_catalog",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "AGGREGATION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_schema",  "name": "catalog", "isContainer": 
false, "cardinality": "SINGLE", "isLegacyAttribute": false },
+      "endDef2": { "type": "trino_catalog", "name": "schemas", "isContainer": 
true,  "cardinality": "SET" }
+    },
+    {
+      "name":                 "trino_table_schema",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "AGGREGATION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_table",  "name": "trinoschema", 
"isContainer": false, "cardinality": "SINGLE", "isLegacyAttribute": false },
+      "endDef2": { "type": "trino_schema", "name": "tables",      
"isContainer": true,  "cardinality": "SET" }
+    },
+    {
+      "name":                 "trino_table_columns",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "COMPOSITION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_table",  "name": "columns", "isContainer": 
true,  "cardinality": "SET",    "isLegacyAttribute": false },
+      "endDef2": { "type": "trino_column", "name": "table",   "isContainer": 
false, "cardinality": "SINGLE", "isLegacyAttribute": false }
+    },
+    {
+      "name":                 "trino_table_ddl_queries",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "COMPOSITION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_table",     "name": "ddlQueries", 
"isContainer": true,  "cardinality": "SET"    },
+      "endDef2": { "type": "trino_table_ddl", "name": "table",      
"isContainer": false, "cardinality": "SINGLE" }
+    },
+    {
+      "name":                 "trino_schema_ddl_queries",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "COMPOSITION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_schema",     "name": "ddlQueries", 
"isContainer": true,  "cardinality": "SET"    },
+      "endDef2": { "type": "trino_schema_ddl", "name": "db",         
"isContainer": false, "cardinality": "SINGLE" }
+    },
+    {
+      "name":                 "trino_process_column_lineage",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "COMPOSITION",
+      "propagateTags":        "NONE",
+      "endDef1": { "type": "trino_column_lineage", "name": "query",          
"isContainer": false, "cardinality": "SINGLE" },
+      "endDef2": { "type": "trino_process",        "name": "columnLineages", 
"isContainer": true,  "cardinality": "SET"    }
+    },
+    {
+      "name":                 "trino_schema_hive_db",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "ASSOCIATION",
+      "propagateTags":        "BOTH",
+      "endDef1": { "type": "hive_db",      "name": "trino_schema",  
"cardinality": "SET"    },
+      "endDef2": { "type": "trino_schema", "name": "hive_db",       
"cardinality": "SINGLE" }
+    },
+    {
+      "name":                 "trino_table_hive_table",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "ASSOCIATION",
+      "propagateTags":        "BOTH",
+      "endDef1": { "type": "hive_table",  "name": "trino_table", 
"cardinality": "SET"    },
+      "endDef2": { "type": "trino_table", "name": "hive_table",  
"cardinality": "SINGLE" }
+    },
+    {
+      "name":                 "trino_column_hive_column",
+      "serviceType":          "trino",
+      "typeVersion":          "1.0",
+      "relationshipCategory": "ASSOCIATION",
+      "propagateTags":        "BOTH",
+      "endDef1": { "type": "hive_column",  "name": "trino_column", 
"cardinality": "SET"    },
+      "endDef2": { "type": "trino_column", "name": "hive_column",  
"cardinality": "SINGLE" }
+    }
+  ]
+}
diff --git a/addons/trino-extractor/pom.xml b/addons/trino-extractor/pom.xml
new file mode 100644
index 000000000..a7a66b937
--- /dev/null
+++ b/addons/trino-extractor/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.atlas</groupId>
+        <artifactId>apache-atlas</artifactId>
+        <version>3.0.0-SNAPSHOT</version>
+        <relativePath>../../</relativePath>
+    </parent>
+
+    <artifactId>atlas-trino-extractor</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Apache Atlas Trino Bridge</name>
+    <description>Apache Atlas Trino Bridge Module</description>
+
+    <properties>
+        <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
+        <checkstyle.skip>false</checkstyle.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-client</artifactId>
+            <version>1.9</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.trino</groupId>
+            <artifactId>trino-jdbc</artifactId>
+            <version>403</version>
+            <!-- java8 supported version -->
+        </dependency>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-client-v2</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-intg</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.quartz-scheduler</groupId>
+            <artifactId>quartz</artifactId>
+            <version>2.3.2</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <excludeScope>test</excludeScope>
+                            <includeScope>compile</includeScope>
+                            
<outputDirectory>${project.build.directory}/dependency/trino</outputDirectory>
+                            <overWriteReleases>false</overWriteReleases>
+                            <overWriteSnapshots>false</overWriteSnapshots>
+                            <overWriteIfNewer>true</overWriteIfNewer>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/addons/trino-extractor/src/main/bin/run-trino-extractor.sh 
b/addons/trino-extractor/src/main/bin/run-trino-extractor.sh
new file mode 100755
index 000000000..3763e6e39
--- /dev/null
+++ b/addons/trino-extractor/src/main/bin/run-trino-extractor.sh
@@ -0,0 +1,137 @@
+#!/bin/bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License. See accompanying LICENSE file.
+#
+# resolve links - $0 may be a softlink
+PRG="${0}"
+
+[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true
+
+while [ -h "${PRG}" ]; do
+  ls=`ls -ld "${PRG}"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    PRG=`dirname "${PRG}"`/"$link"
+  fi
+done
+
+BASEDIR=`dirname ${PRG}`
+BASEDIR=`cd ${BASEDIR}/..;pwd`
+
+if test -z "${JAVA_HOME}"
+then
+    JAVA_BIN=`which java`
+    JAR_BIN=`which jar`
+else
+    JAVA_BIN="${JAVA_HOME}/bin/java"
+    JAR_BIN="${JAVA_HOME}/bin/jar"
+fi
+export JAVA_BIN
+
+if [ ! -e "${JAVA_BIN}" ] || [ ! -e "${JAR_BIN}" ]; then
+  echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure 
java and jar commands are available."
+  exit 1
+fi
+
+for i in "${BASEDIR}/lib/"*.jar; do
+  ATLASCPPATH="${ATLASCPPATH}:$i"
+done
+
+if [ -z "${ATLAS_CONF_DIR}" ] && [ -e "${BASEDIR}/conf/" ];then
+    ATLAS_CONF_DIR="${BASEDIR}/conf/"
+fi
+ATLASCPPATH=${ATLASCPPATH}:${ATLAS_CONF_DIR}
+
+# log dir for applications
+ATLAS_LOG_DIR="${BASEDIR}/log"
+export ATLAS_LOG_DIR
+LOGFILE="$ATLAS_LOG_DIR/atlas-trino-extractor.log"
+
+TIME=`date +%Y%m%d%H%M%s`
+
+CP="${ATLASCPPATH}"
+
+# If running in cygwin, convert pathnames and classpath to Windows format.
+if [ "${CYGWIN}" == "true" ]
+then
+   ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}`
+   LOGFILE=`cygpath -w ${LOGFILE}`
+   HIVE_CP=`cygpath -w ${HIVE_CP}`
+   HADOOP_CP=`cygpath -w ${HADOOP_CP}`
+   CP=`cygpath -w -p ${CP}`
+fi
+
+JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR 
-Datlas.log.file=atlas-trino-extractor.log
+-Dlogback.configurationFile=atlas-trino-extractor-logback.xml 
-Datlas.properties=atlas-trino-extractor.properties 
-Djdk.httpclient.HttpClient.log=requests"
+
+IMPORT_ARGS=()
+JVM_ARGS=
+
+set -f
+while true
+do
+  option=${1}
+  shift
+
+  case "${option}" in
+    -c) IMPORT_ARGS+=("-c" "$1"); shift;;
+    -s) IMPORT_ARGS+=("-s" "$1"); shift;;
+    -t) IMPORT_ARGS+=("-t" "$1"); shift;;
+    -cx)
+        CRON_EXPR="$1"
+        shift
+        while [[ "$1" != "" && "$1" != -* ]]; do
+          CRON_EXPR="$CRON_EXPR $1"
+          shift
+        done
+             IMPORT_ARGS+=("-cx" "$CRON_EXPR");;
+    -h) export HELP_OPTION="true"; IMPORT_ARGS+=("-h");;
+    --catalog) IMPORT_ARGS+=("--catalog" "$1"); shift;;
+    --table) IMPORT_ARGS+=("--table" "$1"); shift;;
+    --schema) IMPORT_ARGS+=("--schema" "$1"); shift;;
+    --cronExpression)
+          CRON_EXPR="$1"
+          shift
+          while [[ "$1" != "" && "$1" != -* ]]; do
+            CRON_EXPR="$CRON_EXPR $1"
+            shift
+          done
+             IMPORT_ARGS+=("--cronExpression" "$CRON_EXPR");;
+    --help) export HELP_OPTION="true"; IMPORT_ARGS+=("--help");;
+    -*)
+           echo "Invalid argument found"
+           export HELP_OPTION="true"; IMPORT_ARGS+=("--help")
+           break;;
+    "") break;;
+  esac
+done
+
+JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}"
+
+if [ -z ${HELP_OPTION} ]; then
+  echo "Log file for import is $LOGFILE"
+fi
+
+"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" 
org.apache.atlas.trino.cli.TrinoExtractor "${IMPORT_ARGS[@]}"
+
+set +f
+
+RETVAL=$?
+if [ -z ${HELP_OPTION} ]; then
+  [ $RETVAL -eq 0 ] && echo Trino Meta Data imported successfully!
+  [ $RETVAL -eq 1 ] && echo Failed to import Trino Meta Data! Check logs at: 
$LOGFILE for details.
+fi
+
+exit $RETVAL
diff --git 
a/addons/trino-extractor/src/main/conf/atlas-trino-extractor-logback.xml 
b/addons/trino-extractor/src/main/conf/atlas-trino-extractor-logback.xml
new file mode 100644
index 000000000..4cb6e4259
--- /dev/null
+++ b/addons/trino-extractor/src/main/conf/atlas-trino-extractor-logback.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <encoder>
+            <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+    </appender>
+
+    <appender name="FILE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${atlas.log.dir}/${atlas.log.file}</file>
+        <append>true</append>
+        <encoder>
+            <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern>
+        </encoder>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            
<fileNamePattern>${atlas.log.dir}/${atlas.log.file}-%d</fileNamePattern>
+            <maxHistory>20</maxHistory>
+            <cleanHistoryOnStart>true</cleanHistoryOnStart>
+        </rollingPolicy>
+    </appender>
+
+    <logger name="org.apache.atlas" additivity="false" level="info">
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <!-- to avoid logs - The configuration log.flush.interval.messages = 1 was 
supplied but isn't a known config -->
+    <logger name="org.apache.kafka.common.config.AbstractConfig" 
additivity="false" level="error">
+        <appender-ref ref="FILE"/>
+    </logger>
+
+    <root level="warn">
+        <appender-ref ref="FILE"/>
+    </root>
+</configuration>
diff --git 
a/addons/trino-extractor/src/main/conf/atlas-trino-extractor.properties 
b/addons/trino-extractor/src/main/conf/atlas-trino-extractor.properties
new file mode 100755
index 000000000..96d79bb33
--- /dev/null
+++ b/addons/trino-extractor/src/main/conf/atlas-trino-extractor.properties
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######## Atlas connection ############
+atlas.rest.address=http://localhost:21000/
+
+######## Trino jdbc url, If SSL is enabled, append below value with 
";SSL=true" ##########
+atlas.trino.jdbc.address=jdbc:trino://<host>:<port>/
+
+######## Trino Server Authentication ############
+atlas.trino.jdbc.user=<username>
+
+######## Trino environment name - default value "cm" ######
+#atlas.trino.namespace=cm
+
+######## Catalogs for which extractor is to run ######
+#atlas.trino.catalogs.registered=
+
+######## Datasource for which Atlas hook is enabled########
+#atlas.trino.catalog.hook.enabled.hive_catalog=true
+#atlas.trino.catalog.hook.enabled.hive_catalog.namespace=cm
+
+######## To run extractor for specific catalog, schema or table ########
+#atlas.trino.extractor.catalog=
+#atlas.trino.extractor.schema=
+#atlas.trino.extractor.table=
+
+######## Cron Expression to run the extractor (Below is example for every 
hour) ########
+#atlas.trino.extractor.schedule=0 0 * * * ? 
\ No newline at end of file
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java
new file mode 100644
index 000000000..dee04300c
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+
+public class ExtractorContext {
+    static final String TRINO_NAMESPACE_CONF         = "atlas.trino.namespace";
+    static final String DEFAULT_TRINO_NAMESPACE      = "cm";
+    static final String TRINO_CATALOG_CONF           = 
"atlas.trino.extractor.catalog";
+    static final String TRINO_SCHEMA_CONF            = 
"atlas.trino.extractor.schema";
+    static final String TRINO_TABLE_CONF             = 
"atlas.trino.extractor.table";
+    static final String TRINO_SCHEDULE_CONF          = 
"atlas.trino.extractor.schedule";
+    static final String OPTION_CATALOG_SHORT         = "c";
+    static final String OPTION_CATALOG_LONG          = "catalog";
+    static final String OPTION_SCHEMA_SHORT          = "s";
+    static final String OPTION_SCHEMA_LONG           = "schema";
+    static final String OPTION_TABLE_SHORT           = "t";
+    static final String OPTION_TABLE_LONG            = "table";
+    static final String OPTION_CRON_EXPRESSION_SHORT = "cx";
+    static final String OPTION_CRON_EXPRESSION_LONG  = "cronExpression";
+    static final String OPTION_HELP_SHORT            = "h";
+    static final String OPTION_HELP_LONG             = "help";
+
+    private final Configuration     atlasConf;
+    private final String            namespace;
+    private final String            catalog;
+    private final String            schema;
+    private final String            table;
+    private final AtlasClientHelper atlasClientHelper;
+    private final TrinoClientHelper trinoClientHelper;
+    private final String            cronExpression;
+
+    public ExtractorContext(CommandLine cmd) throws AtlasException, 
IOException {
+        this.atlasConf         = getAtlasProperties();
+        this.atlasClientHelper = createAtlasClientHelper();
+        this.trinoClientHelper = createTrinoClientHelper();
+        this.namespace         = atlasConf.getString(TRINO_NAMESPACE_CONF, 
DEFAULT_TRINO_NAMESPACE);
+
+        String cmdCatalog      = cmd.getOptionValue(OPTION_CATALOG_SHORT);
+        String cmdSchema       = cmd.getOptionValue(OPTION_SCHEMA_SHORT);
+        String cmdTable        = cmd.getOptionValue(OPTION_TABLE_SHORT);
+        String cmdSchedule     = 
cmd.getOptionValue(OPTION_CRON_EXPRESSION_SHORT);
+        this.cronExpression    = StringUtils.isNotEmpty(cmdSchedule) ? 
cmdSchedule : atlasConf.getString(TRINO_SCHEDULE_CONF);
+
+        if (StringUtils.isEmpty(cmdCatalog)) {
+            this.catalog = atlasConf.getString(TRINO_CATALOG_CONF);
+            this.schema  = atlasConf.getString(TRINO_SCHEMA_CONF);
+            this.table   = atlasConf.getString(TRINO_TABLE_CONF);
+        } else {
+            this.catalog = cmdCatalog;
+            this.schema  = cmdSchema;
+            this.table   = cmdTable;
+        }
+    }
+
+    public Configuration getAtlasConf() {
+        return atlasConf;
+    }
+
+    public AtlasClientHelper getAtlasConnector() {
+        return atlasClientHelper;
+    }
+
+    public TrinoClientHelper getTrinoConnector() {
+        return trinoClientHelper;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public String getSchema() {
+        return schema;
+    }
+
+    public String getCatalog() {
+        return catalog;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public String getCronExpression() {
+        return cronExpression;
+    }
+
+    private Configuration getAtlasProperties() throws AtlasException {
+        return ApplicationProperties.get();
+    }
+
+    private TrinoClientHelper createTrinoClientHelper() {
+        return new TrinoClientHelper(atlasConf);
+    }
+
+    private AtlasClientHelper createAtlasClientHelper() throws IOException {
+        return new AtlasClientHelper(atlasConf);
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java
new file mode 100644
index 000000000..695f6b3c1
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ExtractorService {
+    public static final int    THREAD_POOL_SIZE          = 5;
+    public static final int    CATALOG_EXECUTION_TIMEOUT = 60;
+    public static final String TRINO_NAME_ATTRIBUTE      = "name";
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExtractorService.class);
+    private static final String TRINO_CATALOG_REGISTERED          = 
"atlas.trino.catalogs.registered";
+    private static final String TRINO_CATALOG_HOOK_ENABLED_PREFIX = 
"atlas.trino.catalog.hook.enabled.";
+    private static final String TRINO_CATALOG_HOOK_ENABLED_SUFFIX = 
".namespace";
+
+    public boolean execute(ExtractorContext context) throws Exception {
+        Map<String, String> catalogs = 
context.getTrinoConnector().getAllTrinoCatalogs();
+
+        LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), 
catalogs.keySet());
+
+        try {
+            processCatalogs(context, catalogs);
+
+            deleteCatalogs(context, catalogs);
+        } catch (AtlasServiceException e) {
+            throw new AtlasServiceException(e);
+        }
+
+        return true;
+    }
+
+    private void processCatalogs(ExtractorContext context, Map<String, String> 
catalogInTrino) throws AtlasServiceException {
+        if (MapUtils.isEmpty(catalogInTrino)) {
+            LOG.debug("No catalogs found under Trino");
+        } else {
+            List<Catalog> catalogsToProcess = new ArrayList<>();
+
+            if (StringUtils.isEmpty(context.getCatalog())) {
+                String[] registeredCatalogs = 
context.getAtlasConf().getStringArray(TRINO_CATALOG_REGISTERED);
+
+                if (registeredCatalogs != null) {
+                    for (String registeredCatalog : registeredCatalogs) {
+                        if (catalogInTrino.containsKey(registeredCatalog)) {
+                            catalogsToProcess.add(getCatalogInstance(context, 
registeredCatalog, catalogInTrino.get(registeredCatalog)));
+                        }
+                    }
+                }
+            } else {
+                if (catalogInTrino.containsKey(context.getCatalog())) {
+                    Catalog catalog = getCatalogInstance(context, 
context.getCatalog(), catalogInTrino.get(context.getCatalog()));
+
+                    catalog.setSchemaToImport(context.getSchema());
+                    catalog.setTableToImport(context.getTable());
+
+                    catalogsToProcess.add(catalog);
+                }
+            }
+
+            if (CollectionUtils.isEmpty(catalogsToProcess)) {
+                LOG.info("No catalogs found to extract");
+            } else {
+                LOG.info("{} catalogs to be extracted", 
catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList()));
+
+                AtlasEntityWithExtInfo trinoInstanceEntity = 
context.getAtlasConnector().createOrUpdateInstanceEntity(context.getNamespace());
+                ExecutorService        catalogExecutor     = 
Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), 
THREAD_POOL_SIZE));
+
+                try {
+                    for (Catalog currentCatalog : catalogsToProcess) {
+                        catalogExecutor.submit(() -> {
+                            try {
+                                
currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity);
+                                processCatalog(context, currentCatalog);
+                            } catch (Exception e) {
+                                LOG.error("Error processing catalog: {}", 
currentCatalog, e);
+                            }
+                        });
+                    }
+                } finally {
+                    catalogExecutor.shutdown();
+                }
+
+                try {
+                    if 
(!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, 
TimeUnit.MINUTES)) {
+                        LOG.warn("Catalog processing did not complete within 
{} minutes", CATALOG_EXECUTION_TIMEOUT);
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+
+                    LOG.error("Catalog processing was interrupted", e);
+                }
+
+                LOG.info("Catalogs scan completed");
+            }
+        }
+    }
+
+    private void processCatalog(ExtractorContext context, Catalog catalog) 
throws AtlasServiceException, SQLException {
+        if (catalog != null) {
+            LOG.info("Started extracting catalog: {}", catalog.getName());
+
+            String                 catalogName        = catalog.getName();
+            AtlasEntityWithExtInfo trinoCatalogEntity = 
context.getAtlasConnector().createOrUpdateCatalogEntity(catalog);
+            List<String>           schemas            = 
context.getTrinoConnector().getTrinoSchemas(catalogName, 
catalog.getSchemaToImport());
+
+            LOG.info("Found {} schemas in catalog {}", schemas.size(), 
catalogName);
+
+            processSchemas(context, catalog, trinoCatalogEntity.getEntity(), 
schemas);
+
+            if (StringUtils.isEmpty(context.getSchema())) {
+                deleteSchemas(context, schemas, 
trinoCatalogEntity.getEntity().getGuid());
+            }
+        }
+    }
+
+    private void processSchemas(ExtractorContext context, Catalog catalog, 
AtlasEntity trinoCatalogEntity, List<String> schemaToImport) {
+        for (String schemaName : schemaToImport) {
+            LOG.info("Started extracting schema: {}", schemaName);
+
+            try {
+                AtlasEntityWithExtInfo           schemaEntity = 
context.getAtlasConnector().createOrUpdateSchemaEntity(catalog, 
trinoCatalogEntity, schemaName);
+                Map<String, Map<String, Object>> tables       = 
context.getTrinoConnector().getTrinoTables(catalog.getName(), schemaName, 
catalog.getTableToImport());
+
+                LOG.info("Found {} tables in schema {}.{}", tables.size(), 
catalog.getName(), schemaName);
+
+                processTables(context, catalog, schemaName, 
schemaEntity.getEntity(), tables);
+
+                if (StringUtils.isEmpty(context.getTable())) {
+                    deleteTables(context, new ArrayList<>(tables.keySet()), 
schemaEntity.getEntity().getGuid());
+                }
+            } catch (Exception e) {
+                LOG.error("Error processing schema: {}", schemaName, e);
+            }
+        }
+    }
+
+    private void processTables(ExtractorContext context, Catalog catalog, 
String schemaName, AtlasEntity schemaEntity, Map<String, Map<String, Object>> 
trinoTables) {
+        for (String trinoTableName : trinoTables.keySet()) {
+            LOG.info("Started extracting table: {}", trinoTableName);
+
+            try {
+                Map<String, Map<String, Object>> trinoColumns = 
context.getTrinoConnector().getTrinoColumns(catalog.getName(), schemaName, 
trinoTableName);
+
+                LOG.info("Found {} columns in table {}.{}.{}", 
trinoColumns.size(), catalog.getName(), schemaName, trinoTableName);
+
+                context.getAtlasConnector().createOrUpdateTableEntity(catalog, 
schemaName, trinoTableName, trinoTables.get(trinoTableName), trinoColumns, 
schemaEntity);
+            } catch (Exception e) {
+                LOG.error("Error processing table: {}", trinoTableName, e);
+            }
+        }
+    }
+
+    private void deleteCatalogs(ExtractorContext context, Map<String, String> 
catalogInTrino) throws AtlasServiceException {
+        if (StringUtils.isEmpty(context.getCatalog())) {
+            AtlasEntityHeader trinoInstance = 
context.getAtlasConnector().getTrinoInstance(context.getNamespace());
+
+            if (trinoInstance != null) {
+                Set<String> catalogsToDelete = getCatalogsToDelete(context, 
catalogInTrino, trinoInstance.getGuid());
+
+                if (CollectionUtils.isNotEmpty(catalogsToDelete)) {
+                    LOG.info("Atlas has {} catalogs in instance {} that are no 
more present in Trino, starting to delete", catalogsToDelete, 
trinoInstance.getGuid());
+
+                    for (String catalogGuid : catalogsToDelete) {
+                        try {
+                            deleteSchemas(context, Collections.emptyList(), 
catalogGuid);
+
+                            LOG.info("Deleting catalog: {}", catalogGuid);
+
+                            
context.getAtlasConnector().deleteByGuid(Collections.singleton(catalogGuid));
+                        } catch (AtlasServiceException e) {
+                            LOG.error("Error deleting catalog: {}", 
catalogGuid, e);
+                        }
+                    }
+                } else {
+                    LOG.info("Atlas has no catalogs to delete");
+                }
+            }
+
+            LOG.info("Catalogs scanned for deletion completed");
+        }
+    }
+
+    private void deleteSchemas(ExtractorContext context, List<String> 
schemasInTrino, String catalogGuid) {
+        try {
+            Set<String> schemasToDelete = getSchemasToDelete(context, 
schemasInTrino, catalogGuid);
+
+            if (CollectionUtils.isNotEmpty(schemasToDelete)) {
+                LOG.info("Atlas has {} schemas in catalog {} that are no more 
present in Trino, starting to delete", schemasToDelete.size(), catalogGuid);
+
+                for (String schemaGuid : schemasToDelete) {
+                    try {
+                        deleteTables(context, Collections.emptyList(), 
schemaGuid);
+
+                        LOG.info("Deleting schema: {}", schemaGuid);
+
+                        
context.getAtlasConnector().deleteByGuid(Collections.singleton(schemaGuid));
+                    } catch (AtlasServiceException e) {
+                        LOG.error("Error in deleting schema: {}", schemaGuid, 
e);
+                    }
+                }
+            } else {
+                LOG.info("Atlas has no schemas to delete in catalog {}", 
catalogGuid);
+            }
+        } catch (AtlasServiceException e) {
+            LOG.error("Error deleting schemas in catalog {}", catalogGuid, e);
+        }
+    }
+
+    private void deleteTables(ExtractorContext context, List<String> 
tablesInTrino, String schemaGuid) {
+        try {
+            Set<String> tablesToDelete = getTablesToDelete(context, 
tablesInTrino, schemaGuid);
+
+            if (CollectionUtils.isNotEmpty(tablesToDelete)) {
+                LOG.info("Atlas has {} tables in schema {} that are no more 
present in Trino, starting to delete", tablesToDelete.size(), schemaGuid);
+
+                for (String tableGuid : tablesToDelete) {
+                    try {
+                        LOG.info("Deleting table: {}", tableGuid);
+
+                        
context.getAtlasConnector().deleteByGuid(Collections.singleton(tableGuid));
+                    } catch (AtlasServiceException e) {
+                        LOG.error("Error deleting table: {}", tableGuid, e);
+                    }
+                }
+            } else {
+                LOG.info("Atlas has no tables to delete in schema {}", 
schemaGuid);
+            }
+        } catch (AtlasServiceException e) {
+            LOG.error("Error deleting tables in schema: {}", schemaGuid, e);
+        }
+    }
+
+    private static Catalog getCatalogInstance(ExtractorContext context, String 
catalogName, String connectorType) {
+        Catalog ret = null;
+
+        if (catalogName != null) {
+            boolean isHookEnabled = 
context.getAtlasConf().getBoolean(TRINO_CATALOG_HOOK_ENABLED_PREFIX + 
catalogName, false);
+            String  hookNamespace = null;
+
+            if (isHookEnabled) {
+                String hookNamespaceProperty = 
TRINO_CATALOG_HOOK_ENABLED_PREFIX + catalogName + 
TRINO_CATALOG_HOOK_ENABLED_SUFFIX;
+
+                hookNamespace = 
context.getAtlasConf().getString(hookNamespaceProperty);
+
+                if (hookNamespace == null) {
+                    LOG.warn("Atlas Hook is enabled for {}, but '{}' found 
empty", catalogName, hookNamespaceProperty);
+                }
+            }
+
+            ret = new Catalog(catalogName, connectorType, isHookEnabled, 
hookNamespace, context.getNamespace());
+        }
+
+        return ret;
+    }
+
+    private static Set<String> getCatalogsToDelete(ExtractorContext context, 
Map<String, String> catalogInTrino, String instanceGuid) throws 
AtlasServiceException {
+        Set<String> ret = Collections.emptySet();
+
+        if (instanceGuid != null) {
+            List<AtlasEntityHeader> catalogsInAtlas = 
context.getAtlasConnector().getAllCatalogsInInstance(instanceGuid);
+
+            if (CollectionUtils.isNotEmpty(catalogsInAtlas)) {
+                Map<String, String> finalCatalogInTrino = catalogInTrino == 
null ? Collections.emptyMap() : catalogInTrino;
+
+                ret = catalogsInAtlas.stream()
+                        .filter(entity -> 
entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null)
+                        .filter(entity -> 
!finalCatalogInTrino.containsKey(entity.getAttribute(TRINO_NAME_ATTRIBUTE)))
+                        .map(AtlasEntityHeader::getGuid)
+                        .collect(Collectors.toSet());
+            }
+        }
+
+        return ret;
+    }
+
+    private static Set<String> getSchemasToDelete(ExtractorContext context, 
List<String> schemasInTrino, String catalogGuid) throws AtlasServiceException {
+        Set<String> ret = Collections.emptySet();
+
+        if (catalogGuid != null) {
+            List<AtlasEntityHeader> schemasInAtlas = 
context.getAtlasConnector().getAllSchemasInCatalog(catalogGuid);
+
+            if (CollectionUtils.isNotEmpty(schemasInAtlas)) {
+                List<String> finalSchemasInTrino = schemasInTrino == null ? 
Collections.emptyList() : schemasInTrino;
+
+                ret = schemasInAtlas.stream()
+                        .filter(entity -> 
entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null)
+                        .filter(entity -> 
!finalSchemasInTrino.contains(entity.getAttribute(TRINO_NAME_ATTRIBUTE)))
+                        .map(AtlasEntityHeader::getGuid)
+                        .collect(Collectors.toSet());
+            }
+        }
+
+        return ret;
+    }
+
+    private static Set<String> getTablesToDelete(ExtractorContext context, 
List<String> tablesInTrino, String schemaGuid) throws AtlasServiceException {
+        Set<String> ret = Collections.emptySet();
+
+        if (schemaGuid != null) {
+            List<AtlasEntityHeader> tablesInAtlas = 
context.getAtlasConnector().getAllTablesInSchema(schemaGuid);
+
+            if (CollectionUtils.isNotEmpty(tablesInAtlas)) {
+                List<String> finalTablesInTrino = tablesInTrino == null ? 
Collections.emptyList() : tablesInTrino;
+
+                ret = tablesInAtlas.stream()
+                        .filter(entity -> 
entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null)
+                        .filter(entity -> 
!finalTablesInTrino.contains(entity.getAttribute(TRINO_NAME_ATTRIBUTE)))
+                        .map(AtlasEntityHeader::getGuid)
+                        .collect(Collectors.toSet());
+            }
+        }
+
+        return ret;
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java
new file mode 100644
index 000000000..7db16e7a6
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.StringUtils;
+import org.quartz.CronExpression;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.impl.StdSchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CATALOG_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CATALOG_SHORT;
+import static 
org.apache.atlas.trino.cli.ExtractorContext.OPTION_CRON_EXPRESSION_LONG;
+import static 
org.apache.atlas.trino.cli.ExtractorContext.OPTION_CRON_EXPRESSION_SHORT;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_HELP_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_HELP_SHORT;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_SCHEMA_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_SCHEMA_SHORT;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_TABLE_LONG;
+import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_TABLE_SHORT;
+
+public class TrinoExtractor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TrinoExtractor.class);
+
+    private static final int EXIT_CODE_SUCCESS = 0;
+    private static final int EXIT_CODE_FAILED  = 1;
+    private static final int EXIT_CODE_HELP    = 2;
+
+    private static TrinoExtractor instance;
+
+    private final ExtractorContext extractorContext;
+    private       int              exitCode = EXIT_CODE_FAILED;
+
+    private TrinoExtractor(String[] args) throws Exception {
+        extractorContext = createExtractorContext(args);
+    }
+
+    public static void main(String[] args) {
+        try {
+            instance = new TrinoExtractor(args);
+
+            if (instance.extractorContext != null) {
+                instance.run();
+            } else {
+                LOG.error("Extractor context is null. Cannot proceed with 
extraction.");
+
+                instance.exitCode = EXIT_CODE_FAILED;
+            }
+        } catch (Exception e) {
+            LOG.error("Extraction failed", e);
+
+            instance.exitCode = EXIT_CODE_FAILED;
+        }
+
+        System.exit(instance != null ? instance.exitCode : EXIT_CODE_FAILED);
+    }
+
+    private void run() {
+        try {
+            String cronExpression = extractorContext.getCronExpression();
+
+            if (StringUtils.isNotEmpty(cronExpression)) {
+                if (!CronExpression.isValidExpression(cronExpression)) {
+                    LOG.error("Invalid cron expression: {}", cronExpression);
+
+                    exitCode = EXIT_CODE_FAILED;
+                } else {
+                    LOG.info("Scheduling extraction for cron expression: {}", 
cronExpression);
+
+                    JobDetail job = 
JobBuilder.newJob(MetadataJob.class).withIdentity("metadataJob", 
"group1").build();
+                    Trigger trigger = TriggerBuilder.newTrigger()
+                            
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow()
+                            .build();
+
+                    Scheduler scheduler = null;
+
+                    try {
+                        scheduler = new StdSchedulerFactory().getScheduler();
+
+                        scheduler.scheduleJob(job, trigger);
+                        scheduler.start();
+
+                        try {
+                            Thread.currentThread().join();
+                        } catch (InterruptedException ie) {
+                            LOG.info("Main thread interrupted, shutting down 
scheduler.");
+
+                            exitCode = EXIT_CODE_SUCCESS;
+                        }
+                    } finally {
+                        try {
+                            if (scheduler != null && !scheduler.isShutdown()) {
+                                scheduler.shutdown(true);
+                            }
+                        } catch (SchedulerException se) {
+                            LOG.warn("Error shutting down scheduler: {}", 
se.getMessage());
+                        }
+                    }
+                }
+            } else {
+                LOG.info("Running extraction once");
+
+                ExtractorService extractorService = new ExtractorService();
+
+                if (extractorService.execute(extractorContext)) {
+                    exitCode = EXIT_CODE_SUCCESS;
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Error encountered. exitCode: {}", exitCode, e);
+        } finally {
+            if (extractorContext.getAtlasConnector() != null) {
+                extractorContext.getAtlasConnector().close();
+            }
+        }
+
+        System.exit(exitCode);
+    }
+
+    private ExtractorContext createExtractorContext(String[] args) throws 
AtlasBaseException, IOException {
+        Options acceptedCliOptions = prepareCommandLineOptions();
+
+        try {
+            CommandLine  cmd              = new 
BasicParser().parse(acceptedCliOptions, args, true);
+            List<String> argsNotProcessed = cmd.getArgList();
+
+            if (argsNotProcessed != null && !argsNotProcessed.isEmpty()) {
+                throw new AtlasBaseException("Unrecognized arguments.");
+            }
+
+            ExtractorContext ret = null;
+
+            if (cmd.hasOption(ExtractorContext.OPTION_HELP_SHORT)) {
+                printUsage(acceptedCliOptions);
+                exitCode = EXIT_CODE_HELP;
+            } else {
+                ret = new ExtractorContext(cmd);
+                LOG.debug("Successfully initialized the extractor context.");
+            }
+
+            return ret;
+        } catch (ParseException | AtlasBaseException e) {
+            printUsage(acceptedCliOptions);
+
+            throw new AtlasBaseException("Invalid arguments. Reason: " + 
e.getMessage(), e);
+        } catch (AtlasException e) {
+            throw new AtlasBaseException("Error in getting Application 
Properties. Reason: " + e.getMessage(), e);
+        } catch (IOException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private static Options prepareCommandLineOptions() {
+        Options acceptedCliOptions = new Options();
+
+        return acceptedCliOptions.addOption(OPTION_CATALOG_SHORT, 
OPTION_CATALOG_LONG, true, "Catalog name")
+                .addOption(OPTION_SCHEMA_SHORT, OPTION_SCHEMA_LONG, true, 
"Schema name")
+                .addOption(OPTION_TABLE_SHORT, OPTION_TABLE_LONG, true, "Table 
name")
+                .addOption(OPTION_CRON_EXPRESSION_SHORT, 
OPTION_CRON_EXPRESSION_LONG, true, "Cron expression to run extraction")
+                .addOption(OPTION_HELP_SHORT, OPTION_HELP_LONG, false, "Print 
help message");
+    }
+
+    private static void printUsage(Options options) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(TrinoExtractor.class.getName(), options);
+    }
+
+    @DisallowConcurrentExecution
+    public static class MetadataJob implements Job {
+        private final Logger logger = 
LoggerFactory.getLogger(MetadataJob.class);
+
+        public void execute(JobExecutionContext context) throws 
JobExecutionException {
+            ExtractorContext extractorContext = TrinoExtractor.instance != 
null ? TrinoExtractor.instance.extractorContext : null;
+
+            if (extractorContext != null) {
+                logger.info("Executing metadata extraction at: {}", 
java.time.LocalTime.now());
+
+                ExtractorService extractorService = new ExtractorService();
+
+                try {
+                    if (extractorService.execute(extractorContext)) {
+                        TrinoExtractor.instance.exitCode = EXIT_CODE_SUCCESS;
+                    }
+                } catch (Exception e) {
+                    logger.error("Error encountered: ", e);
+                    throw new JobExecutionException(e);
+                }
+                logger.info("Completed executing metadata extraction at: {}", 
java.time.LocalTime.now());
+            }
+        }
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java
new file mode 100644
index 000000000..9bc18c16c
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.client;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.atlas.type.AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME;
+
+public class AtlasClientHelper {
+    public static final String TRINO_INSTANCE                   = 
"trino_instance";
+    public static final String TRINO_CATALOG                    = 
"trino_catalog";
+    public static final String TRINO_SCHEMA                     = 
"trino_schema";
+    public static final String TRINO_TABLE                      = 
"trino_table";
+    public static final String TRINO_COLUMN                     = 
"trino_column";
+    public static final String TRINO_INSTANCE_CATALOG_ATTRIBUTE = "catalogs";
+    public static final String TRINO_CATALOG_SCHEMA_ATTRIBUTE   = "schemas";
+    public static final String TRINO_SCHEMA_TABLE_ATTRIBUTE     = "tables";
+    public static final String QUALIFIED_NAME_ATTRIBUTE         = 
"qualifiedName";
+    public static final String NAME_ATTRIBUTE                   = "name";
+    public static final int    DEFAULT_PAGE_LIMIT               = 10000;
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasClientHelper.class);
+    private static final String DEFAULT_ATLAS_URL                        = 
"http://localhost:21000/";;
+    private static final String APPLICATION_PROPERTY_ATLAS_ENDPOINT      = 
"atlas.rest.address";
+    private static final String TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE   = 
"connectorType";
+    private static final String TRINO_CATALOG_INSTANCE_ATTRIBUTE         = 
"instance";
+    private static final String TRINO_CATALOG_INSTANCE_RELATIONSHIP      = 
"trino_instance_catalog";
+    private static final String TRINO_SCHEMA_CATALOG_ATTRIBUTE           = 
"catalog";
+    private static final String TRINO_SCHEMA_CATALOG_RELATIONSHIP        = 
"trino_schema_catalog";
+    private static final String TRINO_COLUMN_DATA_TYPE_ATTRIBUTE         = 
"data_type";
+    private static final String TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE = 
"ordinal_position";
+    private static final String TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE    = 
"column_default";
+    private static final String TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE       = 
"is_nullable";
+    private static final String TRINO_COLUMN_TABLE_ATTRIBUTE             = 
"table";
+    private static final String TRINO_TABLE_TYPE                         = 
"table_type";
+    private static final String TRINO_TABLE_COLUMN_RELATIONSHIP          = 
"trino_table_columns";
+    private static final String TRINO_TABLE_SCHEMA_RELATIONSHIP          = 
"trino_table_schema";
+    private static final String TRINO_TABLE_SCHEMA_ATTRIBUTE             = 
"trinoschema";
+    private static final String TRINO_TABLE_COLUMN_ATTRIBUTE             = 
"columns";
+
+    private static AtlasClientV2 atlasClientV2;
+
+    public AtlasClientHelper(Configuration atlasConf) throws IOException {
+        atlasClientV2 = getAtlasClientV2Instance(atlasConf);
+    }
+
+    public List<AtlasEntityHeader> getAllCatalogsInInstance(String 
instanceGuid) throws AtlasServiceException {
+        List<AtlasEntityHeader> entities = 
getAllRelationshipEntities(instanceGuid, TRINO_INSTANCE_CATALOG_ATTRIBUTE);
+
+        if (CollectionUtils.isNotEmpty(entities)) {
+            LOG.debug("Retrieved {} catalogs in Trino instance {}", 
entities.size(), instanceGuid);
+        } else {
+            LOG.debug("No catalog found in Trino instance {}", instanceGuid);
+        }
+
+        return entities;
+    }
+
+    public List<AtlasEntityHeader> getAllSchemasInCatalog(String catalogGuid) 
throws AtlasServiceException {
+        List<AtlasEntityHeader> entities = 
getAllRelationshipEntities(catalogGuid, TRINO_CATALOG_SCHEMA_ATTRIBUTE);
+
+        if (CollectionUtils.isNotEmpty(entities)) {
+            LOG.debug("Retrieved {} schemas in Trino catalog {}", 
entities.size(), catalogGuid);
+        } else {
+            LOG.debug("No schema found in Trino catalog {}", catalogGuid);
+        }
+
+        return entities;
+    }
+
+    public List<AtlasEntityHeader> getAllTablesInSchema(String schemaGuid) 
throws AtlasServiceException {
+        List<AtlasEntityHeader> entities = 
getAllRelationshipEntities(schemaGuid, TRINO_SCHEMA_TABLE_ATTRIBUTE);
+
+        if (CollectionUtils.isNotEmpty(entities)) {
+            LOG.debug("Retrieved {} tables in Trino schema {}", 
entities.size(), schemaGuid);
+        } else {
+            LOG.debug("No table found in Trino schema {}", schemaGuid);
+        }
+
+        return entities;
+    }
+
+    public AtlasEntityHeader getTrinoInstance(String namespace) {
+        try {
+            return atlasClientV2.getEntityHeaderByAttribute(TRINO_INSTANCE, 
Collections.singletonMap(QUALIFIED_NAME_ATTRIBUTE, namespace));
+        } catch (AtlasServiceException e) {
+            return null;
+        }
+    }
+
+    public AtlasEntityWithExtInfo createOrUpdateInstanceEntity(String 
trinoNamespace) throws AtlasServiceException {
+        String                 qualifiedName = trinoNamespace;
+        AtlasEntityWithExtInfo ret           = findEntity(TRINO_INSTANCE, 
qualifiedName, true, true);
+
+        if (ret == null) {
+            AtlasEntity entity = new AtlasEntity(TRINO_INSTANCE);
+
+            entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+            entity.setAttribute(NAME_ATTRIBUTE, trinoNamespace);
+
+            ret = createEntity(new AtlasEntityWithExtInfo(entity));
+        }
+
+        return ret;
+    }
+
+    public AtlasEntityWithExtInfo createOrUpdateCatalogEntity(Catalog catalog) 
throws AtlasServiceException {
+        String catalogName    = catalog.getName();
+        String trinoNamespace = catalog.getInstanceName();
+        String qualifiedName  = String.format("%s@%s", catalogName, 
trinoNamespace);
+
+        AtlasEntityWithExtInfo ret = findEntity(TRINO_CATALOG, qualifiedName, 
true, true);
+
+        if (ret == null) {
+            AtlasEntity entity = new AtlasEntity(TRINO_CATALOG);
+
+            entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+            entity.setAttribute(NAME_ATTRIBUTE, catalogName);
+            entity.setAttribute(TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE, 
catalog.getType());
+            entity.setRelationshipAttribute(TRINO_CATALOG_INSTANCE_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(catalog.getTrinoInstanceEntity().getEntity(),
 TRINO_CATALOG_INSTANCE_RELATIONSHIP));
+
+            if (catalog.getConnector() != null) {
+                catalog.getConnector().connectTrinoCatalog(this, 
catalog.getHookInstanceName(), catalogName, entity, ret);
+            }
+
+            ret = createEntity(new AtlasEntityWithExtInfo(entity));
+        } else {
+            AtlasEntity entity = ret.getEntity();
+
+            entity.setRelationshipAttribute(TRINO_CATALOG_INSTANCE_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(catalog.getTrinoInstanceEntity().getEntity(),
 TRINO_CATALOG_INSTANCE_RELATIONSHIP));
+
+            if (catalog.getConnector() != null) {
+                catalog.getConnector().connectTrinoCatalog(this, 
catalog.getHookInstanceName(), catalogName, entity, ret);
+            }
+
+            ret.setEntity(entity);
+
+            updateEntity(ret);
+        }
+
+        return ret;
+    }
+
+    public AtlasEntityWithExtInfo createOrUpdateSchemaEntity(Catalog catalog, 
AtlasEntity catalogEntity, String schema) throws AtlasServiceException {
+        String                 qualifiedName = String.format("%s.%s@%s", 
catalog.getName(), schema, catalog.getInstanceName());
+        AtlasEntityWithExtInfo ret           = findEntity(TRINO_SCHEMA, 
qualifiedName, true, true);
+
+        if (ret == null) {
+            AtlasEntity entity = new AtlasEntity(TRINO_SCHEMA);
+
+            entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+            entity.setAttribute(NAME_ATTRIBUTE, schema);
+            entity.setRelationshipAttribute(TRINO_SCHEMA_CATALOG_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(catalogEntity, 
TRINO_SCHEMA_CATALOG_RELATIONSHIP));
+
+            if (catalog.getConnector() != null) {
+                catalog.getConnector().connectTrinoSchema(this, 
catalog.getHookInstanceName(), catalog.getName(), schema, entity, ret);
+            }
+
+            ret = createEntity(new AtlasEntityWithExtInfo(entity));
+        } else {
+            AtlasEntity entity = ret.getEntity();
+
+            entity.setRelationshipAttribute(TRINO_SCHEMA_CATALOG_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(catalogEntity, 
TRINO_SCHEMA_CATALOG_RELATIONSHIP));
+
+            if (catalog.getConnector() != null) {
+                catalog.getConnector().connectTrinoSchema(this, 
catalog.getHookInstanceName(), catalog.getName(), schema, entity, ret);
+            }
+
+            ret.setEntity(entity);
+
+            updateEntity(ret);
+        }
+
+        return ret;
+    }
+
+    public AtlasEntityWithExtInfo createOrUpdateTableEntity(Catalog catalog, 
String schema, String table, Map<String, Object> tableMetadata, Map<String, 
Map<String, Object>> trinoColumns, AtlasEntity schemaEntity) throws 
AtlasServiceException {
+        String qualifiedName = String.format("%s.%s.%s@%s", catalog.getName(), 
schema, table, catalog.getInstanceName());
+
+        AtlasEntityWithExtInfo ret;
+        AtlasEntityWithExtInfo tableEntityExt = findEntity(TRINO_TABLE, 
qualifiedName, true, true);
+
+        if (tableEntityExt == null) {
+            tableEntityExt = toTableEntity(catalog, schema, table, 
tableMetadata, trinoColumns, schemaEntity, tableEntityExt);
+            ret            = createEntity(tableEntityExt);
+        } else {
+            ret = toTableEntity(catalog, schema, table, tableMetadata, 
trinoColumns, schemaEntity, tableEntityExt);
+
+            updateEntity(ret);
+        }
+
+        return ret;
+    }
+
+    public void deleteByGuid(Set<String> guidTodelete) throws 
AtlasServiceException {
+        if (CollectionUtils.isNotEmpty(guidTodelete)) {
+            for (String guid : guidTodelete) {
+                EntityMutationResponse response = 
atlasClientV2.deleteEntityByGuid(guid);
+
+                if (response == null || 
response.getDeletedEntities().isEmpty()) {
+                    LOG.debug("Entity with guid : {} is not deleted", guid);
+                } else {
+                    LOG.debug("Entity with guid : {} is deleted", guid);
+                }
+            }
+        }
+    }
+
+    public AtlasEntityWithExtInfo findEntity(final String typeName, final 
String qualifiedName, boolean minExtInfo, boolean ignoreRelationship) throws 
AtlasServiceException {
+        try {
+            return atlasClientV2.getEntityByAttribute(typeName, 
Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName), minExtInfo, 
ignoreRelationship);
+        } catch (AtlasServiceException e) {
+            if (e.getStatus() == ClientResponse.Status.NOT_FOUND) {
+                return null;
+            }
+
+            throw e;
+        }
+    }
+
+    public void close() {
+        if (atlasClientV2 != null) {
+            atlasClientV2.close();
+
+            atlasClientV2 = null;
+        }
+    }
+
+    private synchronized AtlasClientV2 getAtlasClientV2Instance(Configuration 
atlasConf) throws IOException {
+        if (atlasClientV2 == null) {
+            String[] atlasEndpoint = new String[] {DEFAULT_ATLAS_URL};
+
+            if (atlasConf != null && 
ArrayUtils.isNotEmpty(atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT)))
 {
+                atlasEndpoint = 
atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT);
+            }
+
+            if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+                String[] basicAuthUsernamePassword = 
AuthenticationUtil.getBasicAuthenticationInput();
+
+                atlasClientV2 = new AtlasClientV2(atlasEndpoint, 
basicAuthUsernamePassword);
+            } else {
+                UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
+
+                atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), 
atlasEndpoint);
+            }
+        }
+
+        return atlasClientV2;
+    }
+
+    private List<AtlasEntityHeader> getAllRelationshipEntities(String 
entityGuid, String relationshipAttributeName) throws AtlasServiceException {
+        List<AtlasEntityHeader> entities = new ArrayList<>();
+
+        if (entityGuid != null) {
+            final int pageSize = DEFAULT_PAGE_LIMIT;
+
+            for (int i = 0; ; i++) {
+                int offset = pageSize * i;
+
+                LOG.debug("Retrieving: offset={}, pageSize={}", offset, 
pageSize);
+
+                AtlasSearchResult       searchResult  = 
atlasClientV2.relationshipSearch(entityGuid, relationshipAttributeName, null, 
null, true, pageSize, offset);
+                List<AtlasEntityHeader> entityHeaders = searchResult == null ? 
null : searchResult.getEntities();
+                int                     count         = entityHeaders == null 
? 0 : entityHeaders.size();
+
+                if (count > 0) {
+                    entities.addAll(entityHeaders);
+                }
+
+                if (count < pageSize) { // last page
+                    break;
+                }
+            }
+        }
+
+        return entities;
+    }
+
+    private AtlasEntityWithExtInfo toTableEntity(Catalog catalog, String 
schema, String table, Map<String, Object> tableMetadata, Map<String, 
Map<String, Object>> trinoColumns, AtlasEntity schemaEntity, 
AtlasEntityWithExtInfo tableEntityExt) {
+        if (tableEntityExt == null) {
+            tableEntityExt = new AtlasEntityWithExtInfo(new 
AtlasEntity(TRINO_TABLE));
+        }
+
+        AtlasEntity       tableEntity    = tableEntityExt.getEntity();
+        List<AtlasEntity> columnEntities = new ArrayList<>();
+        String            qualifiedName  = String.format("%s.%s.%s@%s", 
catalog.getName(), schema, table, catalog.getInstanceName());
+
+        tableEntity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName);
+        tableEntity.setAttribute(NAME_ATTRIBUTE, table);
+        tableEntity.setAttribute(TRINO_TABLE_TYPE, 
tableMetadata.get(TRINO_TABLE_TYPE).toString());
+
+        for (Map.Entry<String, Map<String, Object>> columnEntry : 
trinoColumns.entrySet()) {
+            AtlasEntity entity           = new AtlasEntity(TRINO_COLUMN);
+            String      columnName       = columnEntry.getKey();
+            String      colQualifiedName = String.format("%s.%s.%s.%s@%s", 
catalog.getName(), schema, table, columnName, catalog.getInstanceName());
+
+            entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, colQualifiedName);
+            entity.setAttribute(NAME_ATTRIBUTE, columnName);
+
+            if (MapUtils.isNotEmpty(columnEntry.getValue())) {
+                Map<String, Object> columnAttr = columnEntry.getValue();
+
+                entity.setAttribute(TRINO_COLUMN_DATA_TYPE_ATTRIBUTE, 
columnAttr.get(TRINO_COLUMN_DATA_TYPE_ATTRIBUTE));
+                entity.setAttribute(TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE, 
columnAttr.get(TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE));
+                entity.setAttribute(TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE, 
columnAttr.get(TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE));
+                entity.setAttribute(TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE, 
columnAttr.get(TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE));
+            }
+
+            entity.setRelationshipAttribute(TRINO_COLUMN_TABLE_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(tableEntity, 
TRINO_TABLE_COLUMN_RELATIONSHIP));
+
+            columnEntities.add(entity);
+        }
+
+        tableEntity.setRelationshipAttribute(TRINO_TABLE_SCHEMA_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(schemaEntity, 
TRINO_TABLE_SCHEMA_RELATIONSHIP));
+        tableEntity.setRelationshipAttribute(TRINO_TABLE_COLUMN_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectIds(columnEntities, 
TRINO_TABLE_COLUMN_RELATIONSHIP));
+
+        if (catalog.getConnector() != null) {
+            catalog.getConnector().connectTrinoTable(this, 
catalog.getHookInstanceName(), catalog.getName(), schema, table, tableEntity, 
columnEntities, tableEntityExt);
+        }
+
+        tableEntityExt.addReferredEntity(schemaEntity);
+
+        for (AtlasEntity column : columnEntities) {
+            tableEntityExt.addReferredEntity(column);
+        }
+
+        return tableEntityExt;
+    }
+
+    private AtlasEntityWithExtInfo createEntity(AtlasEntityWithExtInfo entity) 
throws AtlasServiceException {
+        LOG.debug("creating {} entity: {}", entity.getEntity().getTypeName(), 
entity);
+
+        AtlasEntityWithExtInfo  ret             = null;
+        EntityMutationResponse  response        = 
atlasClientV2.createEntity(entity);
+        List<AtlasEntityHeader> createdEntities = 
response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE);
+
+        if (CollectionUtils.isNotEmpty(createdEntities)) {
+            for (AtlasEntityHeader createdEntity : createdEntities) {
+                if (ret == null) {
+                    ret = 
atlasClientV2.getEntityByGuid(createdEntity.getGuid());
+
+                    LOG.debug("Created {} entity: name={}, guid={}", 
ret.getEntity().getTypeName(), 
ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), 
ret.getEntity().getGuid());
+                } else if (ret.getEntity(createdEntity.getGuid()) == null) {
+                    AtlasEntityWithExtInfo newEntity = 
atlasClientV2.getEntityByGuid(createdEntity.getGuid());
+
+                    ret.addReferredEntity(newEntity.getEntity());
+
+                    if (MapUtils.isNotEmpty(newEntity.getReferredEntities())) {
+                        for (Map.Entry<String, AtlasEntity> entry : 
newEntity.getReferredEntities().entrySet()) {
+                            ret.addReferredEntity(entry.getKey(), 
entry.getValue());
+                        }
+                    }
+
+                    LOG.debug("Created {} entity: name={}, guid={}", 
newEntity.getEntity().getTypeName(), 
newEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), 
newEntity.getEntity().getGuid());
+                }
+            }
+        }
+
+        clearRelationshipAttributes(ret);
+
+        return ret;
+    }
+
+    private void updateEntity(AtlasEntityWithExtInfo entity) throws 
AtlasServiceException {
+        LOG.debug("updating {} entity: {}", entity.getEntity().getTypeName(), 
entity);
+
+        atlasClientV2.updateEntity(entity);
+
+        LOG.debug("Updated {} entity: name={}, guid={}", 
entity.getEntity().getTypeName(), 
entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), 
entity.getEntity().getGuid());
+    }
+
+    private void clearRelationshipAttributes(AtlasEntityWithExtInfo entity) {
+        if (entity != null) {
+            clearRelationshipAttributes(entity.getEntity());
+
+            if (entity.getReferredEntities() != null) {
+                
clearRelationshipAttributes(entity.getReferredEntities().values());
+            }
+        }
+    }
+
+    private void clearRelationshipAttributes(Collection<AtlasEntity> entities) 
{
+        if (entities != null) {
+            for (AtlasEntity entity : entities) {
+                clearRelationshipAttributes(entity);
+            }
+        }
+    }
+
+    private void clearRelationshipAttributes(AtlasEntity entity) {
+        if (entity != null && entity.getRelationshipAttributes() != null) {
+            entity.getRelationshipAttributes().clear();
+        }
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java
new file mode 100644
index 000000000..fcb2595a8
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.client;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TrinoClientHelper {
+    private final String jdbcUrl;
+    private final String username;
+    private final String password;
+
+    public TrinoClientHelper(Configuration atlasConf) {
+        this.jdbcUrl  = atlasConf.getString("atlas.trino.jdbc.address");
+        this.username = atlasConf.getString("atlas.trino.jdbc.user");
+        this.password = atlasConf.getString("atlas.trino.jdbc.password", "");
+    }
+
+    public Map<String, String> getAllTrinoCatalogs() {
+        Map<String, String> catalogs = new HashMap<>();
+
+        try (Connection connection = getTrinoConnection();
+                Statement stmt = connection.createStatement()) {
+            String    query = "SELECT catalog_name, connector_name FROM 
system.metadata.catalogs";
+            ResultSet rs    = stmt.executeQuery(query);
+
+            while (rs.next()) {
+                catalogs.put(rs.getString("catalog_name"), 
rs.getString("connector_name"));
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+
+        return catalogs;
+    }
+
+    public List<String> getTrinoSchemas(String catalog, String schemaToImport) 
throws SQLException {
+        List<String> schemas = new ArrayList<>();
+
+        try (Connection connection = getTrinoConnection();
+                Statement stmt = connection.createStatement()) {
+            StringBuilder query = new StringBuilder();
+
+            query.append("SELECT schema_name FROM 
").append(catalog).append(".information_schema.schemata");
+
+            if (StringUtils.isNotEmpty(schemaToImport)) {
+                query.append(" where schema_name = 
'").append(schemaToImport).append("'");
+            }
+
+            ResultSet rs = stmt.executeQuery(query.toString());
+
+            while (rs.next()) {
+                schemas.add(rs.getString("schema_name"));
+            }
+        }
+
+        return schemas;
+    }
+
+    public Map<String, Map<String, Object>> getTrinoTables(String catalog, 
String schema, String tableToImport) throws SQLException {
+        Map<String, Map<String, Object>> tables = new HashMap<>();
+
+        try (Connection connection = getTrinoConnection();
+                Statement stmt = connection.createStatement()) {
+            StringBuilder query = new StringBuilder();
+
+            query.append("SELECT table_name, table_type FROM 
").append(catalog).append(".information_schema.tables WHERE table_schema = 
'").append(schema).append("'");
+
+            if (StringUtils.isNotEmpty(tableToImport)) {
+                query.append(" and table_name = 
'").append(tableToImport).append("'");
+            }
+
+            ResultSet rs = stmt.executeQuery(query.toString());
+
+            while (rs.next()) {
+                Map<String, Object> tableMetadata = new HashMap<>();
+
+                tableMetadata.put("table_name", rs.getString("table_name"));
+                tableMetadata.put("table_type", rs.getString("table_type"));
+
+                tables.put(rs.getString("table_name"), tableMetadata);
+            }
+        }
+
+        return tables;
+    }
+
+    public Map<String, Map<String, Object>> getTrinoColumns(String catalog, 
String schema, String table) throws SQLException {
+        Map<String, Map<String, Object>> columns = new HashMap<>();
+
+        try (Connection connection = getTrinoConnection();
+                Statement stmt = connection.createStatement()) {
+            StringBuilder query = new StringBuilder();
+
+            query.append("SELECT column_name, ordinal_position, 
column_default, is_nullable, data_type FROM 
").append(catalog).append(".information_schema.columns WHERE table_schema = 
'").append(schema).append("' AND table_name = '").append(table).append("'");
+
+            ResultSet rs = stmt.executeQuery(query.toString());
+
+            while (rs.next()) {
+                Map<String, Object> columnMetadata = new HashMap<>();
+
+                columnMetadata.put("ordinal_position", 
rs.getInt("ordinal_position"));
+                columnMetadata.put("column_default", 
rs.getString("column_default"));
+                columnMetadata.put("column_name", rs.getString("column_name"));
+
+                if (StringUtils.isNotEmpty(rs.getString("is_nullable"))) {
+                    if 
(StringUtils.equalsIgnoreCase(rs.getString("is_nullable"), "YES")) {
+                        columnMetadata.put("is_nullable", true);
+                    } else {
+                        columnMetadata.put("is_nullable", false);
+                    }
+                }
+
+                columnMetadata.put("data_type", rs.getString("data_type"));
+
+                columns.put(rs.getString("column_name"), columnMetadata);
+            }
+        }
+
+        return columns;
+    }
+
+    private Connection getTrinoConnection() throws SQLException {
+        return DriverManager.getConnection(jdbcUrl, username, password);
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java
new file mode 100644
index 000000000..fd6bf5974
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+
+import java.util.List;
+
+public abstract class AtlasEntityConnector {
+    public abstract void connectTrinoCatalog(AtlasClientHelper atlasClient, 
String instanceName, String catalogName, AtlasEntity entity, 
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo);
+
+    public abstract void connectTrinoSchema(AtlasClientHelper atlasClient, 
String instanceName, String catalogName, String schemaName, AtlasEntity entity, 
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo);
+
+    public abstract void connectTrinoTable(AtlasClientHelper atlasClient, 
String instanceName, String catalogName, String schemaName, String tableName, 
AtlasEntity entity, List<AtlasEntity> columnEntities, 
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo);
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java
new file mode 100644
index 000000000..e7e9d2664
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectorFactory {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConnectorFactory.class);
+
+    public static AtlasEntityConnector getConnector(String connectorType) {
+        switch (connectorType.toLowerCase()) {
+            case "iceberg":
+                return new IcebergEntityConnector();
+            case "hive":
+                return new HiveEntityConnector();
+            default:
+                LOG.warn("{}: unrecognized connector type", connectorType);
+
+                return null;
+        }
+    }
+
+    private ConnectorFactory() {
+        throw new UnsupportedOperationException("This is a utility class and 
cannot be instantiated");
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java
new file mode 100644
index 000000000..7f75f6da6
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class HiveEntityConnector extends AtlasEntityConnector {
+    public static final String HIVE_DB                               = 
"hive_db";
+    public static final String HIVE_TABLE                            = 
"hive_table";
+    public static final String HIVE_COLUMN                           = 
"hive_column";
+    public static final String TRINO_SCHEMA_HIVE_DB_RELATIONSHIP     = 
"trino_schema_hive_db";
+    public static final String TRINO_TABLE_HIVE_TABLE_RELATIONSHIP   = 
"trino_table_hive_table";
+    public static final String TRINO_COLUMN_HIVE_COLUMN_RELATIONSHIP = 
"trino_column_hive_column";
+    public static final String TRINO_SCHEMA_HIVE_DB_ATTRIBUTE        = 
"hive_db";
+    public static final String TRINO_TABLE_HIVE_TABLE_ATTRIBUTE      = 
"hive_table";
+    public static final String TRINO_COLUMN_HIVE_COLUMN_ATTRIBUTE    = 
"hive_column";
+    private static final Logger LOG = 
LoggerFactory.getLogger(HiveEntityConnector.class);
+
+    @Override
+    public void connectTrinoCatalog(AtlasClientHelper atlasClient, String 
instanceName, String catalogName, AtlasEntity entity, AtlasEntityWithExtInfo 
entityWithExtInfo) {
+    }
+
+    @Override
+    public void connectTrinoSchema(AtlasClientHelper atlasClient, String 
instanceName, String catalogName, String schemaName, AtlasEntity dbEntity, 
AtlasEntityWithExtInfo entityWithExtInfo) {
+        if (instanceName == null) {
+            LOG.warn("Failed attempting to connect entity since hook namespace 
is empty, Please configure in properties");
+        } else {
+            try {
+                AtlasEntity hiveDb = toDbEntity(atlasClient, instanceName, 
schemaName);
+
+                if (hiveDb != null) {
+                    
dbEntity.setRelationshipAttribute(TRINO_SCHEMA_HIVE_DB_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(hiveDb, 
TRINO_SCHEMA_HIVE_DB_RELATIONSHIP));
+                }
+            } catch (AtlasServiceException e) {
+                LOG.error("Error encountered: ", e);
+            }
+        }
+    }
+
+    @Override
+    public void connectTrinoTable(AtlasClientHelper atlasClient, String 
instanceName, String catalogName, String schemaName, String tableName, 
AtlasEntity trinoTable, List<AtlasEntity> columnEntities, 
AtlasEntityWithExtInfo entityWithExtInfo) {
+        if (instanceName == null) {
+            LOG.warn("Failed attempting to connect entity since hook namespace 
is empty, Please configure in properties");
+        } else {
+            try {
+                AtlasEntity hiveTable = toTableEntity(atlasClient, 
instanceName, schemaName, tableName);
+
+                if (hiveTable != null) {
+                    
trinoTable.setRelationshipAttribute(TRINO_TABLE_HIVE_TABLE_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(hiveTable, 
TRINO_TABLE_HIVE_TABLE_RELATIONSHIP));
+
+                    for (AtlasEntity columnEntity : columnEntities) {
+                        connectTrinoColumn(atlasClient, instanceName, 
schemaName, tableName, columnEntity);
+                    }
+                }
+            } catch (AtlasServiceException e) {
+                LOG.error("Error encountered: ", e);
+            }
+        }
+    }
+
+    private void connectTrinoColumn(AtlasClientHelper atlasClient, String 
instanceName, String schemaName, String tableName, AtlasEntity trinoColumn) 
throws AtlasServiceException {
+        if (instanceName != null) {
+            try {
+                AtlasEntity hiveColumn = toColumnEntity(atlasClient, 
instanceName, schemaName, tableName, 
trinoColumn.getAttribute("name").toString());
+
+                if (hiveColumn != null) {
+                    
trinoColumn.setRelationshipAttribute(TRINO_COLUMN_HIVE_COLUMN_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(hiveColumn, 
TRINO_COLUMN_HIVE_COLUMN_RELATIONSHIP));
+                }
+            } catch (AtlasServiceException e) {
+                throw new AtlasServiceException(e);
+            }
+        }
+    }
+
+    private AtlasEntity toDbEntity(AtlasClientHelper atlasClient, String 
instanceName, String schemaName) throws AtlasServiceException {
+        String                 dbQualifiedName = schemaName + "@" + 
instanceName;
+        AtlasEntityWithExtInfo ret             = 
atlasClient.findEntity(HIVE_DB, dbQualifiedName, true, true);
+
+        return ret != null ? ret.getEntity() : null;
+    }
+
+    private AtlasEntity toTableEntity(AtlasClientHelper atlasClient, String 
instanceName, String schemaName, String tableName) throws AtlasServiceException 
{
+        String                 tableQualifiedName = schemaName + "." + 
tableName + "@" + instanceName;
+        AtlasEntityWithExtInfo ret                = 
atlasClient.findEntity(HIVE_TABLE, tableQualifiedName, true, true);
+
+        return ret != null ? ret.getEntity() : null;
+    }
+
+    private AtlasEntity toColumnEntity(AtlasClientHelper atlasClient, String 
instanceName, String schemaName, String tableName, String columnName) throws 
AtlasServiceException {
+        String                 columnQualifiedName = schemaName + "." + 
tableName + "." + columnName + "@" + instanceName;
+        AtlasEntityWithExtInfo ret                 = 
atlasClient.findEntity(HIVE_COLUMN, columnQualifiedName, true, true);
+
+        return ret != null ? ret.getEntity() : null;
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java
new file mode 100644
index 000000000..8c99f36f4
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.connector;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class IcebergEntityConnector extends AtlasEntityConnector {
+    public static final String HIVE_DB                                  = 
"hive_db";
+    public static final String ICEBERG_TABLE                            = 
"iceberg_table";
+    public static final String ICEBERG_COLUMN                           = 
"iceberg_column";
+    public static final String TRINO_SCHEMA_HIVE_DB_RELATIONSHIP        = 
"trino_schema_hive_db";
+    public static final String TRINO_TABLE_ICEBERG_TABLE_RELATIONSHIP   = 
"trino_table_iceberg_table";
+    public static final String TRINO_COLUMN_ICEBERG_COLUMN_RELATIONSHIP = 
"trino_column_iceberg_column";
+    public static final String TRINO_SCHEMA_HIVE_DB_ATTRIBUTE           = 
"hive_db";
+    public static final String TRINO_TABLE_ICEBERG_TABLE_ATTRIBUTE      = 
"iceberg_table";
+    public static final String TRINO_COLUMN_ICEBERG_COLUMN_ATTRIBUTE    = 
"iceberg_column";
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergEntityConnector.class);
+
+    @Override
+    public void connectTrinoCatalog(AtlasClientHelper atlasClient, String 
instanceName, String catalogName, AtlasEntity entity, AtlasEntityWithExtInfo 
entityWithExtInfo) {
+    }
+
+    @Override
+    public void connectTrinoSchema(AtlasClientHelper atlasClient, String 
instanceName, String catalogName, String schemaName, AtlasEntity dbEntity, 
AtlasEntityWithExtInfo entityWithExtInfo) {
+        if (instanceName == null) {
+            LOG.warn("Failed attempting to connect entity since hook namespace 
is empty, Please configure in properties");
+        } else {
+            try {
+                AtlasEntity hiveDb = toDbEntity(atlasClient, instanceName, 
schemaName);
+
+                if (hiveDb != null) {
+                    
dbEntity.setRelationshipAttribute(TRINO_SCHEMA_HIVE_DB_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(hiveDb, 
TRINO_SCHEMA_HIVE_DB_RELATIONSHIP));
+                }
+            } catch (AtlasServiceException e) {
+                LOG.error("Error encountered: ", e);
+            }
+        }
+    }
+
+    @Override
+    public void connectTrinoTable(AtlasClientHelper atlasClient, String 
instanceName, String catalogName, String schemaName, String tableName, 
AtlasEntity trinoTable, List<AtlasEntity> columnEntities, 
AtlasEntityWithExtInfo entityWithExtInfo) {
+        if (instanceName == null) {
+            LOG.warn("Failed attempting to connect entity since hook namespace 
is empty, Please configure in properties");
+        } else {
+            try {
+                AtlasEntity icebergTable = toTableEntity(atlasClient, 
instanceName, schemaName, tableName);
+
+                if (icebergTable != null) {
+                    
trinoTable.setRelationshipAttribute(TRINO_TABLE_ICEBERG_TABLE_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(icebergTable, 
TRINO_TABLE_ICEBERG_TABLE_RELATIONSHIP));
+
+                    for (AtlasEntity columnEntity : columnEntities) {
+                        connectTrinoColumn(atlasClient, instanceName, 
schemaName, tableName, columnEntity);
+                    }
+                }
+            } catch (AtlasServiceException e) {
+                LOG.error("Error encountered: ", e);
+            }
+        }
+    }
+
+    private void connectTrinoColumn(AtlasClientHelper atlasClient, String 
instanceName, String schemaName, String tableName, AtlasEntity trinoColumn) 
throws AtlasServiceException {
+        if (instanceName != null) {
+            try {
+                AtlasEntity icebergColumn = toColumnEntity(atlasClient, 
instanceName, schemaName, tableName, 
trinoColumn.getAttribute("name").toString());
+
+                if (icebergColumn != null) {
+                    
trinoColumn.setRelationshipAttribute(TRINO_COLUMN_ICEBERG_COLUMN_ATTRIBUTE, 
AtlasTypeUtil.getAtlasRelatedObjectId(icebergColumn, 
TRINO_COLUMN_ICEBERG_COLUMN_RELATIONSHIP));
+                }
+            } catch (AtlasServiceException e) {
+                throw new AtlasServiceException(e);
+            }
+        }
+    }
+
+    private AtlasEntity toDbEntity(AtlasClientHelper atlasClient, String 
instanceName, String schemaName) throws AtlasServiceException {
+        String                 dbQualifiedName = schemaName + "@" + 
instanceName;
+        AtlasEntityWithExtInfo ret             = 
atlasClient.findEntity(HIVE_DB, dbQualifiedName, true, true);
+
+        return ret != null ? ret.getEntity() : null;
+    }
+
+    private AtlasEntity toTableEntity(AtlasClientHelper atlasClient, String 
instanceName, String schemaName, String tableName) throws AtlasServiceException 
{
+        String                 tableQualifiedName = schemaName + "." + 
tableName + "@" + instanceName;
+        AtlasEntityWithExtInfo ret                = 
atlasClient.findEntity(ICEBERG_TABLE, tableQualifiedName, true, true);
+
+        return ret != null ? ret.getEntity() : null;
+    }
+
+    private AtlasEntity toColumnEntity(AtlasClientHelper atlasClient, String 
instanceName, String schemaName, String tableName, String columnName) throws 
AtlasServiceException {
+        String                 columnQualifiedName = schemaName + "." + 
tableName + "." + columnName + "@" + instanceName;
+        AtlasEntityWithExtInfo ret                 = 
atlasClient.findEntity(ICEBERG_COLUMN, columnQualifiedName, true, true);
+
+        return ret != null ? ret.getEntity() : null;
+    }
+}
diff --git 
a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java
 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java
new file mode 100644
index 000000000..cfeca6c78
--- /dev/null
+++ 
b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.model;
+
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.trino.connector.AtlasEntityConnector;
+import org.apache.atlas.trino.connector.ConnectorFactory;
+
+public class Catalog {
+    private final String                 name;
+    private final String                 type;
+    private final String                 hookInstanceName;
+    private final String                 instanceName;
+    private final AtlasEntityConnector   connector;
+    private       AtlasEntityWithExtInfo trinoInstanceEntity;
+    private       String                 schemaToImport;
+    private       String                 tableToImport;
+
+    public Catalog(String name, String type, boolean hookEnabled, String 
hookInstanceName, String instanceName) {
+        this.name             = name;
+        this.type             = type;
+        this.hookInstanceName = hookInstanceName;
+        this.instanceName     = instanceName;
+        this.connector        = hookEnabled ? 
ConnectorFactory.getConnector(type) : null;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public AtlasEntityConnector getConnector() {
+        return connector;
+    }
+
+    public String getHookInstanceName() {
+        return hookInstanceName;
+    }
+
+    public String getInstanceName() {
+        return instanceName;
+    }
+
+    public AtlasEntityWithExtInfo getTrinoInstanceEntity() {
+        return trinoInstanceEntity;
+    }
+
+    public void setTrinoInstanceEntity(AtlasEntityWithExtInfo 
trinoInstanceEntity) {
+        this.trinoInstanceEntity = trinoInstanceEntity;
+    }
+
+    public String getTableToImport() {
+        return tableToImport;
+    }
+
+    public void setTableToImport(String tableToImport) {
+        this.tableToImport = tableToImport;
+    }
+
+    public String getSchemaToImport() {
+        return schemaToImport;
+    }
+
+    public void setSchemaToImport(String schemaToImport) {
+        this.schemaToImport = schemaToImport;
+    }
+}
diff --git 
a/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java
 
b/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java
new file mode 100644
index 000000000..e0e74ff9b
--- /dev/null
+++ 
b/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+public class TrinoExtractorIT {
+ /* List of testcases
+    Invalid Arguments
+    Invalid cron expression
+    Test valid Catalog to be run
+    Test Instance creation
+    Test catalog creation
+    Test schema creation
+    Test table creation
+    Test of hook is enabled, hook entity if created, is connected to Trino 
entity
+    Test cron doesn't trigger new job, before earlier thread completes
+    Test without cron expression
+    Test even if catalog is not registered, it should run if passed from 
commandLine
+    Deleted table
+    Deleted catalog
+    Deleted column
+    Deleted schema
+    Rename catalog
+    Rename schema
+    Tag propagated*/
+}
diff --git a/distro/pom.xml b/distro/pom.xml
index af0d01060..d7e958889 100644
--- a/distro/pom.xml
+++ b/distro/pom.xml
@@ -276,6 +276,7 @@ atlas.graph.storage.hbase.regions-per-server=1
                                         
<!--<descriptor>src/main/assemblies/migration-exporter.xml</descriptor>-->
                                         
<descriptor>src/main/assemblies/classification-updater.xml</descriptor>
                                         
<descriptor>src/main/assemblies/notification-analyzer.xml</descriptor>
+                                        
<descriptor>src/main/assemblies/atlas-trino-extractor.xml</descriptor>
                                     </descriptors>
                                     
<finalName>apache-atlas-${project.version}</finalName>
                                     <tarLongFileMode>gnu</tarLongFileMode>
diff --git a/distro/src/main/assemblies/atlas-trino-extractor.xml 
b/distro/src/main/assemblies/atlas-trino-extractor.xml
new file mode 100644
index 000000000..bffffd67b
--- /dev/null
+++ b/distro/src/main/assemblies/atlas-trino-extractor.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
+          
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
+    <formats>
+        <format>tar.gz</format>
+    </formats>
+    <id>trino-extractor</id>
+    
<baseDirectory>apache-atlas-trino-extractor-${project.version}</baseDirectory>
+        <fileSets>
+            <fileSet>
+                <includes>
+                    <include>README*</include>
+                </includes>
+            </fileSet>
+            <fileSet>
+                <directory>../addons/trino-extractor/src/main/conf</directory>
+                <outputDirectory>/conf</outputDirectory>
+                <includes>
+                    <include>atlas-trino-extractor-logback.xml</include>
+                    <include>atlas-trino-extractor.properties</include>
+                </includes>
+                <fileMode>0755</fileMode>
+                <directoryMode>0755</directoryMode>
+            </fileSet>
+            <fileSet>
+                <directory>../addons/trino-extractor/src/main/bin</directory>
+                <outputDirectory>/bin</outputDirectory>
+                <includes>
+                    <include>run-trino-extractor.sh</include>
+                </includes>
+                <fileMode>0755</fileMode>
+                <directoryMode>0755</directoryMode>
+            </fileSet>
+            <fileSet>
+                
<directory>../addons/trino-extractor/target/dependency/trino</directory>
+                <outputDirectory>/lib</outputDirectory>
+            </fileSet>
+            <fileSet>
+                <directory>../addons/trino-extractor/target</directory>
+                <outputDirectory>/lib</outputDirectory>
+                <includes>
+                    <include>atlas-trino-extractor-*.jar</include>
+                </includes>
+            </fileSet>
+        </fileSets>
+
+</assembly>
diff --git a/docs/src/documents/Tools/TrinoExtractor.md 
b/docs/src/documents/Tools/TrinoExtractor.md
new file mode 100644
index 000000000..cf660f643
--- /dev/null
+++ b/docs/src/documents/Tools/TrinoExtractor.md
@@ -0,0 +1,149 @@
+---
+name: Trino Extractor
+route: /TrinoExtractor
+menu: Documentation
+submenu: Tools
+---
+
+import  themen  from 'theme/styles/styled-colors';
+import  * as theme  from 'react-syntax-highlighter/dist/esm/styles/hljs';
+import SyntaxHighlighter from 'react-syntax-highlighter';
+
+# Trino Extractor
+
+## Overview
+
+The Trino Extractor is a comprehensive metadata extraction utility designed 
for Apache Atlas integration with Trino. It provides discovery, extraction, and 
synchronization of Trino metadata including catalogs, schemas, tables, and 
columns into Apache Atlas for enhanced data governance and metadata management.
+
+## Key Features
+
+### Metadata Extraction
+* **Comprehensive Discovery**: Automatically discovers and extracts metadata 
from Trino catalogs, schemas, tables, and columns
+* **JDBC-Based Connection**: Uses standard Trino JDBC driver for reliable 
connectivity
+* **Selective Extraction**: Supports extraction for specific catalog, schema, 
or table names
+
+### Atlas Integration
+* **Entity Management**: Creates and updates Atlas entities for Trino metadata 
objects
+* **Relationship Mapping**: Establishes proper hierarchical relationships 
between catalogs, schemas, tables, and columns
+* **Synchronization**: Maintains consistency by removing Atlas entities that 
no longer exist in Trino
+* **Connector Support**: Specialized handling for Trino connectors for which 
Atlas captures the metadata through individual Hook like Hive, Iceberg
+
+### Scheduling & Automation
+* **Cron-based Scheduling**: Supports automated periodic extraction using cron 
expressions
+* **One-time Execution**: Can be run as a single extraction job
+* **Error Handling**: Robust error handling with detailed logging
+
+## Architecture
+
+<SyntaxHighlighter language="text" style={theme.github}>
+{`
+┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
+│   Trino Cluster │    │ Trino Extractor │    │  Apache Atlas   │
+│                 │    │                 │    │                 │
+│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
+│ │  Catalogs   │◄┼────┼►│ JDBC Client │ │    │ │  Entities   │ │
+│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
+│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
+│ │   Schemas   │ │    │ │ Extraction  │◄┼────┼►│Relationships│ │
+│ └─────────────┘ │    │ │   Service   │ │    │ └─────────────┘ │
+│ ┌─────────────┐ │    │ └─────────────┘ │    │ ┌─────────────┐ │
+│ │   Tables    │ │    │ ┌─────────────┐ │    │ │  Lineage    │ │
+│ └─────────────┘ │    │ │Atlas Client │◄┼────┼►│    Data     │ │
+│ ┌─────────────┐ │    │ └─────────────┘ │    │ └─────────────┘ │
+│ │   Columns   │ │    │                 │    │                 │
+│ └─────────────┘ │    └─────────────────┘    └─────────────────┘
+└─────────────────┘                           
+`}
+</SyntaxHighlighter>
+
+## Quick Start
+
+### 1. Configuration Setup
+
+Configure the `atlas-trino-extractor.properties` file:
+
+<SyntaxHighlighter language="properties" style={theme.github}>
+{`
+# Atlas connection
+atlas.rest.address=http://localhost:21000/
+# Trino connection
+atlas.trino.jdbc.address=jdbc:trino://localhost:8080/
+atlas.trino.jdbc.user=your-username
+# Catalogs to extract
+atlas.trino.catalogs.registered=hive_catalog,iceberg_catalog
+`}
+</SyntaxHighlighter>
+
+### 2. Basic Execution
+
+<SyntaxHighlighter language="bash" style={theme.github}>
+{`
+# Extract all registered catalogs
+./bin/run-trino-extractor.sh
+# Extract specific catalog
+./bin/run-trino-extractor.sh -c my_catalog
+# Schedule periodic extraction (every 6 hours)
+./bin/run-trino-extractor.sh -cx "0 0 */6 * * ?"
+`}
+</SyntaxHighlighter>
+
+## Configuration Properties
+
+| Property | Description | Default | Example |
+|----------|-------------|---------|---------|
+| `atlas.rest.address` | Atlas REST API endpoint | `http://localhost:21000/` | 
`https://atlas.company.com:21443/` |
+| `atlas.trino.jdbc.address` | Trino JDBC URL | - | 
`jdbc:trino://trino-server:8080/` |
+| `atlas.trino.jdbc.user` | Trino username | - | `admin` |
+| `atlas.trino.jdbc.password` | Trino password | `""` | `password123` |
+| `atlas.trino.namespace` | Trino instance namespace | `cm` | 
`production-cluster` |
+| `atlas.trino.catalogs.registered` | Catalogs to extract | - | 
`hive,iceberg,mysql` |
+| `atlas.trino.catalog.hook.enabled.<catalog-name>` | Hook enabled under atlas 
for this catalog? | `false` | `true` |
+| `atlas.trino.catalog.hook.enabled.<catalog-name>.namespace` | Namespace 
under Atlas for this Hook | `cm` | `cm` |
+| `atlas.trino.extractor.schedule` | Cron expression | - | `0 0 2 * * ?` |
+
+## Command Line Usage
+
+### Available Options
+
+| Option | Long Form | Description | Example |
+|--------|-----------|-------------|---------|
+| `-c` | `--catalog` | Extract specific catalog | `-c hive_catalog` |
+| `-s` | `--schema` | Extract specific schema | `-s sales_data` |
+| `-t` | `--table` | Extract specific table | `-t customer_orders` |
+| `-cx` | `--cronExpression` | Schedule with cron expression | `-cx "0 0 2 * * 
?"` |
+| `-h` | `--help` | Display help information | `-h` |
+
+## Connector-Specific Processing
+
+#### For Example: Hive Connector Integration
+<SyntaxHighlighter language="properties" style={theme.github}>
+{`
+# Enable Hive hook integration
+atlas.trino.catalog.hook.enabled.hive_catalog=true
+atlas.trino.catalog.hook.enabled.hive_catalog.namespace=cm
+`}
+</SyntaxHighlighter>
+
+#### Benefits:
+- Links Trino entities with existing Hive entities
+- Maintains consistency between Hive and Trino metadata
+- Supports environments with Atlas Hive hooks
+
+## FAQ
+
+### Troubleshooting Questions
+
+**Q: Why are some entities not appearing in Atlas?**
+
+A: Check catalog registration, permissions, and network connectivity. Review 
logs for specific errors.
+
+**Q: How do I handle large clusters with thousands of tables?**
+
+A: Use selective extraction, increase memory allocation, schedule during 
off-peak hours, and process catalogs individually.
+
+### Documentation
+- [Apache Atlas Documentation](https://atlas.apache.org/#/)
+- [Trino Documentation](https://trino.io/docs/)
+- [Atlas REST API Reference](https://atlas.apache.org/api/v2/)
+
+
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
index c9fa2e4cd..a1c6cf87e 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
@@ -87,7 +87,7 @@ public class AtlasJanusGraphManagement implements 
AtlasGraphManagement {
     private final AtlasJanusGraph      graph;
     private final JanusGraphManagement management;
     private final Set<String>          newMultProperties = new HashSet<>();
-    private       boolean              isSuccess         = false;
+    private       boolean              isSuccess;
 
     public AtlasJanusGraphManagement(AtlasJanusGraph graph, 
JanusGraphManagement managementSystem) {
         this.management = managementSystem;
diff --git a/pom.xml b/pom.xml
index 9341140d4..ac2d4df6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
         <module>addons/sqoop-bridge-shim</module>
         <module>addons/storm-bridge</module>
         <module>addons/storm-bridge-shim</module>
+        <module>addons/trino-extractor</module>
         <module>atlas-examples</module>
         <module>authorization</module>
         <module>build-tools</module>
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 191dea810..d6aaab025 100755
--- 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -20,7 +20,6 @@ package org.apache.atlas.repository.graph;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.discovery.SearchIndexer;
 import org.apache.atlas.exception.AtlasBaseException;
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index cf364a01d..0f9a4831e 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -22,7 +22,6 @@ import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.pc.WorkItemManager;
 import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.IndexException;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
 import org.apache.atlas.repository.graphdb.AtlasCardinality;
 import org.apache.atlas.repository.graphdb.AtlasGraph;

Reply via email to