This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch ATLAS-5021_u in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 9627a0522fc8be1a1a2e3ea16bdf32f7ca1d0cc0 Author: Pinal Shah <[email protected]> AuthorDate: Mon Aug 18 11:34:23 2025 +0530 ATLAS-5021: Extract Metadata from Trino periodically --- addons/models/6000-Trino/6000-trino_model.json | 185 +++++++++ addons/trino-extractor/pom.xml | 96 +++++ .../src/main/bin/run-trino-extractor.sh | 138 +++++++ .../src/main/conf/atlas-application.properties | 32 ++ .../trino-extractor/src/main/conf/atlas-log4j.xml | 42 ++ .../apache/atlas/trino/cli/ExtractorContext.java | 106 +++++ .../apache/atlas/trino/cli/ExtractorService.java | 352 +++++++++++++++++ .../org/apache/atlas/trino/cli/TrinoExtractor.java | 229 +++++++++++ .../atlas/trino/client/AtlasClientHelper.java | 438 +++++++++++++++++++++ .../atlas/trino/client/TrinoClientHelper.java | 151 +++++++ .../trino/connector/AtlasEntityConnector.java | 31 ++ .../atlas/trino/connector/ConnectorFactory.java | 38 ++ .../atlas/trino/connector/HiveEntityConnector.java | 119 ++++++ .../trino/connector/IcebergEntityConnector.java | 119 ++++++ .../java/org/apache/atlas/trino/model/Catalog.java | 85 ++++ .../src/main/resources/atlas-logback.xml | 51 +++ .../apache/atlas/trino/cli/TrinoExtractorIT.java | 42 ++ distro/pom.xml | 1 + .../src/main/assemblies/atlas-trino-extractor.xml | 65 +++ pom.xml | 1 + 20 files changed, 2321 insertions(+) diff --git a/addons/models/6000-Trino/6000-trino_model.json b/addons/models/6000-Trino/6000-trino_model.json new file mode 100644 index 000000000..734038530 --- /dev/null +++ b/addons/models/6000-Trino/6000-trino_model.json @@ -0,0 +1,185 @@ +{ + "entityDefs": [ + { + "name": "trino_instance", + "superTypes": [ "DataSet" ], + "serviceType": "trino", + "typeVersion": "1.0", + "attributeDefs": [ + { "name": "hostname", "typeName": "string", "isOptional": true, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false }, + { "name": "port", "typeName": "int", "isOptional": true, "cardinality": "SINGLE", "isUnique": false, "isIndexable": false } + ] + }, + { + "name": "trino_catalog", + "superTypes": [ "Asset" ], + "serviceType": "trino", + "typeVersion": "1.0", + "attributeDefs": [ + { "name": "connectorType", "typeName": "string", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false }, + { "name": "connectionUrl", "typeName": "string", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true } + ] + }, + { + "name": "trino_schema", + "superTypes": [ "Asset" ], + "serviceType": "trino", + "typeVersion": "1.0", + "attributeDefs": [ + { "name": "parameters", "typeName": "map<string,string>", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true } + ] + }, + { + "name": "trino_table", + "superTypes": [ "DataSet" ], + "serviceType": "trino", + "typeVersion": "1.0", + "attributeDefs": [ + { "name": "comment", "typeName": "string", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true }, + { "name": "parameters", "typeName": "map<string,string>", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true }, + { "name": "table_type", "typeName": "string", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true } + ] + }, + { + "name": "trino_column", + "superTypes": [ "DataSet" ], + "serviceType": "trino", + "typeVersion": "1.0", + "attributeDefs": [ + { "name": "data_type", "typeName": "string", "cardinality": "SINGLE", "isUnique": false, "isIndexable": true, "isOptional": true }, + { "name": "ordinal_position", "typeName": "int", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true }, + { "name": "column_default", "typeName": "string", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true }, + { "name": "comment", "typeName": "string", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true }, + { "name": "is_nullable", "typeName": "boolean", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true }, + { "name": "isPrimaryKey", "typeName": "boolean", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true } + ] + }, + { + "name": "trino_column_lineage", + "superTypes": [ "Process" ], + "serviceType": "trino", + "typeVersion": "1.0", + "attributeDefs": [ + { "name": "expression", "typeName": "string", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "isOptional": true } + ] + }, + { + "name": "trino_process", + "superTypes": [ "Process" ], + "serviceType": "trino", + "typeVersion": "1.0", + "attributeDefs": [ + { "name": "queryId", "typeName": "string", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 }, + { "name": "queryCreateTime", "typeName": "long", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 }, + { "name": "queryEndTime", "typeName": "long", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 }, + { "name": "queryText", "typeName": "string", "cardinality": "SINGLE", "isUnique": false, "isIndexable": false, "searchWeight": 5 } + ] + }, + { + "name": "trino_schema_ddl", + "superTypes": [ "ddl" ], + "serviceType": "trino", + "typeVersion": "1.0", + "attributeDefs": [ ] + }, + { + "name": "trino_table_ddl", + "superTypes": [ "ddl" ], + "serviceType": "trino", + "typeVersion": "1.0", + "attributeDefs": [ ] + } + ], + "relationshipDefs": [ + { + "name": "trino_instance_catalog", + "serviceType": "trino", + "typeVersion": "1.0", + "relationshipCategory": "COMPOSITION", + "propagateTags": "NONE", + "endDef1": { "type": "trino_catalog", "name": "instance", "isContainer": false, "cardinality": "SINGLE", "isLegacyAttribute": false }, + "endDef2": { "type": "trino_instance", "name": "catalogs", "isContainer": true, "cardinality": "SET" } + }, + { + "name": "trino_schema_catalog", + "serviceType": "trino", + "typeVersion": "1.0", + "relationshipCategory": "AGGREGATION", + "propagateTags": "NONE", + "endDef1": { "type": "trino_schema", "name": "catalog", "isContainer": false, "cardinality": "SINGLE", "isLegacyAttribute": false }, + "endDef2": { "type": "trino_catalog", "name": "schemas", "isContainer": true, "cardinality": "SET" } + }, + { + "name": "trino_table_schema", + "serviceType": "trino", + "typeVersion": "1.0", + "relationshipCategory": "AGGREGATION", + "propagateTags": "NONE", + "endDef1": { "type": "trino_table", "name": "trinoschema", "isContainer": false, "cardinality": "SINGLE", "isLegacyAttribute": false }, + "endDef2": { "type": "trino_schema", "name": "tables", "isContainer": true, "cardinality": "SET" } + }, + { + "name": "trino_table_columns", + "serviceType": "trino", + "typeVersion": "1.0", + "relationshipCategory": "COMPOSITION", + "propagateTags": "NONE", + "endDef1": { "type": "trino_table", "name": "columns", "isContainer": true, "cardinality": "SET", "isLegacyAttribute": false }, + "endDef2": { "type": "trino_column", "name": "table", "isContainer": false, "cardinality": "SINGLE", "isLegacyAttribute": false } + }, + { + "name": "trino_table_ddl_queries", + "serviceType": "trino", + "typeVersion": "1.0", + "relationshipCategory": "COMPOSITION", + "propagateTags": "NONE", + "endDef1": { "type": "trino_table", "name": "ddlQueries", "isContainer": true, "cardinality": "SET" }, + "endDef2": { "type": "trino_table_ddl", "name": "table", "isContainer": false, "cardinality": "SINGLE" } + }, + { + "name": "trino_schema_ddl_queries", + "serviceType": "trino", + "typeVersion": "1.0", + "relationshipCategory": "COMPOSITION", + "propagateTags": "NONE", + "endDef1": { "type": "trino_schema", "name": "ddlQueries", "isContainer": true, "cardinality": "SET" }, + "endDef2": { "type": "trino_schema_ddl", "name": "db", "isContainer": false, "cardinality": "SINGLE" } + }, + { + "name": "trino_process_column_lineage", + "serviceType": "trino", + "typeVersion": "1.0", + "relationshipCategory": "COMPOSITION", + "propagateTags": "NONE", + "endDef1": { "type": "trino_column_lineage", "name": "query", "isContainer": false, "cardinality": "SINGLE" }, + "endDef2": { "type": "trino_process", "name": "columnLineages", "isContainer": true, "cardinality": "SET" } + }, + { + "name": "trino_schema_hive_db", + "serviceType": "trino", + "typeVersion": "1.0", + "relationshipCategory": "ASSOCIATION", + "propagateTags": "BOTH", + "endDef1": { "type": "hive_db", "name": "trino_schema", "cardinality": "SET" }, + "endDef2": { "type": "trino_schema", "name": "hive_db", "cardinality": "SINGLE" } + }, + { + "name": "trino_table_hive_table", + "serviceType": "trino", + "typeVersion": "1.0", + "relationshipCategory": "ASSOCIATION", + "propagateTags": "BOTH", + "endDef1": { "type": "hive_table", "name": "trino_table", "cardinality": "SET" }, + "endDef2": { "type": "trino_table", "name": "hive_table", "cardinality": "SINGLE" } + }, + { + "name": "trino_column_hive_column", + "serviceType": "trino", + "typeVersion": "1.0", + "relationshipCategory": "ASSOCIATION", + "propagateTags": "BOTH", + "endDef1": { "type": "hive_column", "name": "trino_column", "cardinality": "SET" }, + "endDef2": { "type": "trino_column", "name": "hive_column", "cardinality": "SINGLE" } + } + ] +} diff --git a/addons/trino-extractor/pom.xml b/addons/trino-extractor/pom.xml new file mode 100644 index 000000000..70dcbcd5f --- /dev/null +++ b/addons/trino-extractor/pom.xml @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.atlas</groupId> + <artifactId>apache-atlas</artifactId> + <version>3.0.0-SNAPSHOT</version> + <relativePath>../../</relativePath> + </parent> + + <artifactId>atlas-trino-extractor</artifactId> + <packaging>jar</packaging> + + <name>Apache Atlas Trino Bridge</name> + <description>Apache Atlas Trino Bridge Module</description> + + <properties> + <checkstyle.failOnViolation>true</checkstyle.failOnViolation> + <checkstyle.skip>false</checkstyle.skip> + </properties> + + <dependencies> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + <version>1.9</version> + </dependency> + + <dependency> + <groupId>io.trino</groupId> + <artifactId>trino-jdbc</artifactId> + <version>403</version> + <!-- java8 supported version --> + </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client-v2</artifactId> + </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-intg</artifactId> + </dependency> + <dependency> + <groupId>org.quartz-scheduler</groupId> + <artifactId>quartz</artifactId> + <version>2.3.2</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-dependencies</id> + <goals> + <goal>copy-dependencies</goal> + </goals> + <phase>package</phase> + <configuration> + <excludeScope>test</excludeScope> + <includeScope>compile</includeScope> + <outputDirectory>${project.build.directory}/dependency/trino</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/addons/trino-extractor/src/main/bin/run-trino-extractor.sh b/addons/trino-extractor/src/main/bin/run-trino-extractor.sh new file mode 100755 index 000000000..a7d4c8d71 --- /dev/null +++ b/addons/trino-extractor/src/main/bin/run-trino-extractor.sh @@ -0,0 +1,138 @@ +#!/bin/bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. See accompanying LICENSE file. +# +# resolve links - $0 may be a softlink +PRG="${0}" + +[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true + +while [ -h "${PRG}" ]; do + ls=`ls -ld "${PRG}"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "${PRG}"`/"$link" + fi +done + +BASEDIR=`dirname ${PRG}` +BASEDIR=`cd ${BASEDIR}/..;pwd` + +if test -z "${JAVA_HOME}" +then + JAVA_BIN=`which java` + JAR_BIN=`which jar` +else + JAVA_BIN="${JAVA_HOME}/bin/java" + JAR_BIN="${JAVA_HOME}/bin/jar" +fi +export JAVA_BIN + +if [ ! -e "${JAVA_BIN}" ] || [ ! -e "${JAR_BIN}" ]; then + echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure java and jar commands are available." + exit 1 +fi + +# Construct Atlas classpath using jars from hook/hive/atlas-hive-plugin-impl/ directory. +for i in "${BASEDIR}/lib/"*.jar; do + ATLASCPPATH="${ATLASCPPATH}:$i" +done + +if [ -z "${ATLAS_CONF_DIR}" ] && [ -e "${BASEDIR}/conf/" ];then + ATLAS_CONF_DIR="${BASEDIR}/conf/" +fi +ATLASCPPATH=${ATLASCPPATH}:${ATLAS_CONF_DIR} + +# log dir for applications +ATLAS_LOG_DIR="${BASEDIR}/log" +export ATLAS_LOG_DIR +LOGFILE="$ATLAS_LOG_DIR/atlas-trino-extractor.log" + +TIME=`date +%Y%m%d%H%M%s` + +CP="${ATLASCPPATH}" + +# If running in cygwin, convert pathnames and classpath to Windows format. +if [ "${CYGWIN}" == "true" ] +then + ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}` + LOGFILE=`cygpath -w ${LOGFILE}` + HIVE_CP=`cygpath -w ${HIVE_CP}` + HADOOP_CP=`cygpath -w ${HADOOP_CP}` + CP=`cygpath -w -p ${CP}` +fi + +JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR -Datlas.log.file=atlas-trino-extractor.log +-Dlog4j.configuration=atlas-log4j.xml -Djdk.httpclient.HttpClient.log=requests -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006" + +IMPORT_ARGS=() +JVM_ARGS= + +set -f +while true +do + option=${1} + shift + + case "${option}" in + -c) IMPORT_ARGS+=("-c" "$1"); shift;; + -s) IMPORT_ARGS+=("-s" "$1"); shift;; + -t) IMPORT_ARGS+=("-t" "$1"); shift;; + -cx) + CRON_EXPR="$1" + shift + while [[ "$1" != "" && "$1" != -* ]]; do + CRON_EXPR="$CRON_EXPR $1" + shift + done + IMPORT_ARGS+=("-cx" "$CRON_EXPR");; + -h) export HELP_OPTION="true"; IMPORT_ARGS+=("-h");; + --catalog) IMPORT_ARGS+=("--catalog" "$1"); shift;; + --table) IMPORT_ARGS+=("--table" "$1"); shift;; + --schema) IMPORT_ARGS+=("--schema" "$1"); shift;; + --cronExpression) + CRON_EXPR="$1" + shift + while [[ "$1" != "" && "$1" != -* ]]; do + CRON_EXPR="$CRON_EXPR $1" + shift + done + IMPORT_ARGS+=("--cronExpression" "$CRON_EXPR");; + --help) export HELP_OPTION="true"; IMPORT_ARGS+=("--help");; + -*) + echo "Invalid argument found" + export HELP_OPTION="true"; IMPORT_ARGS+=("--help") + break;; + "") break;; + esac +done + +JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}" + +if [ -z ${HELP_OPTION} ]; then + echo "Log file for import is $LOGFILE" +fi + +"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.trino.cli.TrinoExtractor "${IMPORT_ARGS[@]}" + +set +f + +RETVAL=$? +if [ -z ${HELP_OPTION} ]; then + [ $RETVAL -eq 0 ] && echo Trino Meta Data imported successfully! + [ $RETVAL -eq 1 ] && echo Failed to import Trino Meta Data! Check logs at: $LOGFILE for details. +fi + +exit $RETVAL diff --git a/addons/trino-extractor/src/main/conf/atlas-application.properties b/addons/trino-extractor/src/main/conf/atlas-application.properties new file mode 100755 index 000000000..d709ad016 --- /dev/null +++ b/addons/trino-extractor/src/main/conf/atlas-application.properties @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +######## Atlas connection ############ +atlas.rest.address=http://localhost:21000/ + +######## Trino connection ############ +atlas.trino.jdbc.address=jdbc:trino://<host>:<port>/ +atlas.trino.jdbc.user=<username> + +######## Trino environment name ###### +atlas.trino.namespace=cm +#atlas.trino.catalogs.registered= + +######## Datasource for which ######## +######## Atlas hook is enabled ####### +#atlas.trino.catalog.hook.enabled.hive_catalog=true +#atlas.trino.catalog.hook.enabled.hive_catalog.namespace=cm \ No newline at end of file diff --git a/addons/trino-extractor/src/main/conf/atlas-log4j.xml b/addons/trino-extractor/src/main/conf/atlas-log4j.xml new file mode 100644 index 000000000..371c3b3e3 --- /dev/null +++ b/addons/trino-extractor/src/main/conf/atlas-log4j.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="FILE" class="org.apache.log4j.RollingFileAppender"> + <param name="File" value="${atlas.log.dir}/${atlas.log.file}"/> + <param name="Append" value="true"/> + <param name="maxFileSize" value="100MB" /> + <param name="maxBackupIndex" value="20" /> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/> + </layout> + </appender> + + <logger name="org.apache.atlas.trino" additivity="false"> + <level value="info"/> + <appender-ref ref="FILE"/> + </logger> + + <root> + <priority value="warn"/> + <appender-ref ref="FILE"/> + </root> +</log4j:configuration> diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java new file mode 100644 index 000000000..4cc14969a --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.trino.client.TrinoClientHelper; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.configuration.Configuration; + +import java.io.IOException; + +public class ExtractorContext { + static final String TRINO_NAMESPACE_CONF = "atlas.trino.namespace"; + static final String DEFAULT_TRINO_NAMESPACE = "cm"; + static final String OPTION_CATALOG_SHORT = "c"; + static final String OPTION_CATALOG_LONG = "catalog"; + static final String OPTION_SCHEMA_SHORT = "s"; + static final String OPTION_SCHEMA_LONG = "schema"; + static final String OPTION_TABLE_SHORT = "t"; + static final String OPTION_TABLE_LONG = "table"; + static final String OPTION_CRON_EXPRESSION_SHORT = "cx"; + static final String OPTION_CRON_EXPRESSION_LONG = "cronExpression"; + static final String OPTION_HELP_SHORT = "h"; + static final String OPTION_HELP_LONG = "help"; + + private final Configuration atlasConf; + private final String namespace; + private final String catalog; + private final String schema; + private final String table; + private final AtlasClientHelper atlasClientHelper; + private final TrinoClientHelper trinoClientHelper; + private final String cronExpression; + + public ExtractorContext(CommandLine cmd) throws AtlasException, IOException { + this.atlasConf = getAtlasProperties(); + this.atlasClientHelper = createAtlasClientHelper(); + this.trinoClientHelper = createTrinoClientHelper(); + this.namespace = atlasConf.getString(TRINO_NAMESPACE_CONF, DEFAULT_TRINO_NAMESPACE); + this.catalog = cmd.getOptionValue(OPTION_CATALOG_SHORT); + this.schema = cmd.getOptionValue(OPTION_SCHEMA_SHORT); + this.table = cmd.getOptionValue(OPTION_TABLE_SHORT); + this.cronExpression = cmd.getOptionValue(OPTION_CRON_EXPRESSION_SHORT); + } + + public Configuration getAtlasConf() { + return atlasConf; + } + + public AtlasClientHelper getAtlasConnector() { + return atlasClientHelper; + } + + public TrinoClientHelper getTrinoConnector() { + return trinoClientHelper; + } + + public String getTable() { + return table; + } + + public String getSchema() { + return schema; + } + + public String getCatalog() { + return catalog; + } + + public String getNamespace() { + return namespace; + } + + public String getCronExpression() { + return cronExpression; + } + + private Configuration getAtlasProperties() throws AtlasException { + return ApplicationProperties.get(); + } + + private TrinoClientHelper createTrinoClientHelper() { + return new TrinoClientHelper(atlasConf); + } + + private AtlasClientHelper createAtlasClientHelper() throws IOException { + return new AtlasClientHelper(atlasConf); + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java new file mode 100644 index 000000000..554a32a21 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java @@ -0,0 +1,352 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.trino.model.Catalog; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ExtractorService { + private static final Logger LOG = LoggerFactory.getLogger(ExtractorService.class); + + public static final int THREAD_POOL_SIZE = 5; + public static final int CATALOG_EXECUTION_TIMEOUT = 60; + public static final String TRINO_NAME_ATTRIBUTE = "name"; + + private static final String TRINO_CATALOG_REGISTERED = "atlas.trino.catalogs.registered"; + private static final String TRINO_CATALOG_HOOK_ENABLED_PREFIX = "atlas.trino.catalog.hook.enabled."; + private static final String TRINO_CATALOG_HOOK_ENABLED_SUFFIX = ".namespace"; + + public boolean execute(ExtractorContext context) throws Exception { + Map<String, String> catalogs = context.getTrinoConnector().getAllTrinoCatalogs(); + + LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), catalogs.keySet()); + + try { + processCatalogs(context, catalogs); + + deleteCatalogs(context, catalogs); + } catch (AtlasServiceException e) { + throw new AtlasServiceException(e); + } + + return true; + } + + private void processCatalogs(ExtractorContext context, Map<String, String> catalogInTrino) throws AtlasServiceException { + if (MapUtils.isEmpty(catalogInTrino)) { + LOG.debug("No catalogs found under Trino"); + } else { + List<Catalog> catalogsToProcess = new ArrayList<>(); + + if (StringUtils.isEmpty(context.getCatalog())) { + String[] registeredCatalogs = context.getAtlasConf().getStringArray(TRINO_CATALOG_REGISTERED); + + if (registeredCatalogs != null) { + for (String registeredCatalog : registeredCatalogs) { + if (catalogInTrino.containsKey(registeredCatalog)) { + catalogsToProcess.add(getCatalogInstance(context, registeredCatalog, catalogInTrino.get(registeredCatalog))); + } + } + } + } else { + if (catalogInTrino.containsKey(context.getCatalog())) { + Catalog catalog = getCatalogInstance(context, context.getCatalog(), catalogInTrino.get(context.getCatalog())); + + catalog.setSchemaToImport(context.getSchema()); + catalog.setTableToImport(context.getTable()); + + catalogsToProcess.add(catalog); + } + } + + if (CollectionUtils.isEmpty(catalogsToProcess)) { + LOG.info("No catalogs found to extract"); + } else { + LOG.info("{} catalogs to be extracted", catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList())); + + AtlasEntityWithExtInfo trinoInstanceEntity = context.getAtlasConnector().createOrUpdateInstanceEntity(context.getNamespace()); + ExecutorService catalogExecutor = Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), THREAD_POOL_SIZE)); + + try { + for (Catalog currentCatalog : catalogsToProcess) { + catalogExecutor.submit(() -> { + try { + currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity); + processCatalog(context, currentCatalog); + } catch (Exception e) { + LOG.error("Error processing catalog: {}", currentCatalog, e); + } + }); + } + } finally { + catalogExecutor.shutdown(); + } + + try { + if (!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, TimeUnit.MINUTES)) { + LOG.warn("Catalog processing did not complete within {} minutes", CATALOG_EXECUTION_TIMEOUT); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + LOG.error("Catalog processing was interrupted", e); + } + + LOG.info("Catalogs scan completed"); + } + } + } + + private void processCatalog(ExtractorContext context, Catalog catalog) throws AtlasServiceException, SQLException { + if (catalog != null) { + LOG.info("Started extracting catalog: {}", catalog.getName()); + + String catalogName = catalog.getName(); + AtlasEntityWithExtInfo trinoCatalogEntity = context.getAtlasConnector().createOrUpdateCatalogEntity(catalog); + List<String> schemas = context.getTrinoConnector().getTrinoSchemas(catalogName, catalog.getSchemaToImport()); + + LOG.info("Found {} schemas in catalog {}", schemas.size(), catalogName); + + processSchemas(context, catalog, trinoCatalogEntity.getEntity(), schemas); + + if (StringUtils.isEmpty(context.getSchema())) { + deleteSchemas(context, schemas, trinoCatalogEntity.getEntity().getGuid()); + } + } + } + + private void processSchemas(ExtractorContext context, Catalog catalog, AtlasEntity trinoCatalogEntity, List<String> schemaToImport) { + for (String schemaName : schemaToImport) { + LOG.info("Started extracting schema: {}", schemaName); + + try { + AtlasEntityWithExtInfo schemaEntity = context.getAtlasConnector().createOrUpdateSchemaEntity(catalog, trinoCatalogEntity, schemaName); + Map<String, Map<String, Object>> tables = context.getTrinoConnector().getTrinoTables(catalog.getName(), schemaName, catalog.getTableToImport()); + + LOG.info("Found {} tables in schema {}.{}", tables.size(), catalog.getName(), schemaName); + + processTables(context, catalog, schemaName, schemaEntity.getEntity(), tables); + + if (StringUtils.isEmpty(context.getTable())) { + deleteTables(context, new ArrayList<>(tables.keySet()), schemaEntity.getEntity().getGuid()); + } + } catch (Exception e) { + LOG.error("Error processing schema: {}", schemaName, e); + } + } + } + + private void processTables(ExtractorContext context, Catalog catalog, String schemaName, AtlasEntity schemaEntity, Map<String, Map<String, Object>> trinoTables) { + for (String trinoTableName : trinoTables.keySet()) { + LOG.info("Started extracting table: {}", trinoTableName); + + try { + Map<String, Map<String, Object>> trinoColumns = context.getTrinoConnector().getTrinoColumns(catalog.getName(), schemaName, trinoTableName); + + LOG.info("Found {} columns in table {}.{}.{}", trinoColumns.size(), catalog.getName(), schemaName, trinoTableName); + + context.getAtlasConnector().createOrUpdateTableEntity(catalog, schemaName, trinoTableName, trinoTables.get(trinoTableName), trinoColumns, schemaEntity); + } catch (Exception e) { + LOG.error("Error processing table: {}", trinoTableName, e); + } + } + } + + private void deleteCatalogs(ExtractorContext context, Map<String, String> catalogInTrino) throws AtlasServiceException { + if (StringUtils.isEmpty(context.getCatalog())) { + AtlasEntityHeader trinoInstance = context.getAtlasConnector().getTrinoInstance(context.getNamespace()); + + if (trinoInstance != null) { + Set<String> catalogsToDelete = getCatalogsToDelete(context, catalogInTrino, trinoInstance.getGuid()); + + if (CollectionUtils.isNotEmpty(catalogsToDelete)) { + LOG.info("Atlas has {} catalogs in instance {} that are no more present in Trino, starting to delete", catalogsToDelete, trinoInstance.getGuid()); + + for (String catalogGuid : catalogsToDelete) { + try { + deleteSchemas(context, Collections.emptyList(), catalogGuid); + + LOG.info("Deleting catalog: {}", catalogGuid); + + context.getAtlasConnector().deleteByGuid(Collections.singleton(catalogGuid)); + } catch (AtlasServiceException e) { + LOG.error("Error deleting catalog: {}", catalogGuid, e); + } + } + } else { + LOG.info("Atlas has no catalogs to delete"); + } + } + + LOG.info("Catalogs scanned for deletion completed"); + } + } + + private void deleteSchemas(ExtractorContext context, List<String> schemasInTrino, String catalogGuid) { + try { + Set<String> schemasToDelete = getSchemasToDelete(context, schemasInTrino, catalogGuid); + + if (CollectionUtils.isNotEmpty(schemasToDelete)) { + LOG.info("Atlas has {} schemas in catalog {} that are no more present in Trino, starting to delete", schemasToDelete.size(), catalogGuid); + + for (String schemaGuid : schemasToDelete) { + try { + deleteTables(context, Collections.emptyList(), schemaGuid); + + LOG.info("Deleting schema: {}", schemaGuid); + + context.getAtlasConnector().deleteByGuid(Collections.singleton(schemaGuid)); + } catch (AtlasServiceException e) { + LOG.error("Error in deleting schema: {}", schemaGuid, e); + } + } + } else { + LOG.info("Atlas has no schemas to delete in catalog {}", catalogGuid); + } + } catch (AtlasServiceException e) { + LOG.error("Error deleting schemas in catalog {}", catalogGuid, e); + } + } + + private void deleteTables(ExtractorContext context, List<String> tablesInTrino, String schemaGuid) { + try { + Set<String> tablesToDelete = getTablesToDelete(context, tablesInTrino, schemaGuid); + + if (CollectionUtils.isNotEmpty(tablesToDelete)) { + LOG.info("Atlas has {} tables in schema {} that are no more present in Trino, starting to delete", tablesToDelete.size(), schemaGuid); + + for (String tableGuid : tablesToDelete) { + try { + LOG.info("Deleting table: {}", tableGuid); + + context.getAtlasConnector().deleteByGuid(Collections.singleton(tableGuid)); + } catch (AtlasServiceException e) { + LOG.error("Error deleting table: {}", tableGuid, e); + } + } + } else { + LOG.info("Atlas has no tables to delete in schema {}", schemaGuid); + } + } catch (AtlasServiceException e) { + LOG.error("Error deleting tables in schema: {}", schemaGuid, e); + } + } + + private static Catalog getCatalogInstance(ExtractorContext context, String catalogName, String connectorType) { + Catalog ret = null; + + if (catalogName != null) { + boolean isHookEnabled = context.getAtlasConf().getBoolean(TRINO_CATALOG_HOOK_ENABLED_PREFIX + catalogName, false); + String hookNamespace = null; + + if (isHookEnabled) { + String hookNamespaceProperty = TRINO_CATALOG_HOOK_ENABLED_PREFIX + catalogName + TRINO_CATALOG_HOOK_ENABLED_SUFFIX; + + hookNamespace = context.getAtlasConf().getString(hookNamespaceProperty); + + if (hookNamespace == null) { + LOG.warn("Atlas Hook is enabled for {}, but '{}' found empty", catalogName, hookNamespaceProperty); + } + } + + ret = new Catalog(catalogName, connectorType, isHookEnabled, hookNamespace, context.getNamespace()); + } + + return ret; + } + + private static Set<String> getCatalogsToDelete(ExtractorContext context, Map<String, String> catalogInTrino, String instanceGuid) throws AtlasServiceException { + Set<String> ret = Collections.emptySet(); + + if (instanceGuid != null) { + List<AtlasEntityHeader> catalogsInAtlas = context.getAtlasConnector().getAllCatalogsInInstance(instanceGuid); + + if (CollectionUtils.isNotEmpty(catalogsInAtlas)) { + Map<String, String> finalCatalogInTrino = catalogInTrino == null ? Collections.emptyMap() : catalogInTrino; + + ret = catalogsInAtlas.stream() + .filter(entity -> entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null) + .filter(entity -> !finalCatalogInTrino.containsKey(entity.getAttribute(TRINO_NAME_ATTRIBUTE))) + .map(AtlasEntityHeader::getGuid) + .collect(Collectors.toSet()); + } + } + + return ret; + } + + private static Set<String> getSchemasToDelete(ExtractorContext context, List<String> schemasInTrino, String catalogGuid) throws AtlasServiceException { + Set<String> ret = Collections.emptySet(); + + if (catalogGuid != null) { + List<AtlasEntityHeader> schemasInAtlas = context.getAtlasConnector().getAllSchemasInCatalog(catalogGuid); + + if (CollectionUtils.isNotEmpty(schemasInAtlas)) { + List<String> finalSchemasInTrino = schemasInTrino == null ? Collections.emptyList() : schemasInTrino; + + ret = schemasInAtlas.stream() + .filter(entity -> entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null) + .filter(entity -> !finalSchemasInTrino.contains(entity.getAttribute(TRINO_NAME_ATTRIBUTE))) + .map(AtlasEntityHeader::getGuid) + .collect(Collectors.toSet()); + } + } + + return ret; + } + + private static Set<String> getTablesToDelete(ExtractorContext context, List<String> tablesInTrino, String schemaGuid) throws AtlasServiceException { + Set<String> ret = Collections.emptySet(); + + if (schemaGuid != null) { + List<AtlasEntityHeader> tablesInAtlas = context.getAtlasConnector().getAllTablesInSchema(schemaGuid); + + if (CollectionUtils.isNotEmpty(tablesInAtlas)) { + List<String> finalTablesInTrino = tablesInTrino == null ? Collections.emptyList() : tablesInTrino; + + ret = tablesInAtlas.stream() + .filter(entity -> entity.getAttribute(TRINO_NAME_ATTRIBUTE) != null) + .filter(entity -> !finalTablesInTrino.contains(entity.getAttribute(TRINO_NAME_ATTRIBUTE))) + .map(AtlasEntityHeader::getGuid) + .collect(Collectors.toSet()); + } + } + + return ret; + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java new file mode 100644 index 000000000..df1bd7420 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.trino.cli; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.lang.StringUtils; +import org.quartz.CronExpression; +import org.quartz.CronScheduleBuilder; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CATALOG_LONG; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CATALOG_SHORT; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CRON_EXPRESSION_LONG; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CRON_EXPRESSION_SHORT; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_HELP_LONG; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_HELP_SHORT; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_SCHEMA_LONG; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_SCHEMA_SHORT; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_TABLE_LONG; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_TABLE_SHORT; + +public class TrinoExtractor { + private static final Logger LOG = LoggerFactory.getLogger(TrinoExtractor.class); + + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_FAILED = 1; + private static final int EXIT_CODE_HELP = 2; + + private static TrinoExtractor instance; + + private final ExtractorContext extractorContext; + private int exitCode = EXIT_CODE_FAILED; + + private TrinoExtractor(String[] args) throws Exception { + extractorContext = createExtractorContext(args); + } + + public static void main(String[] args) { + try { + instance = new TrinoExtractor(args); + + if (instance.extractorContext != null) { + instance.run(); + } else { + LOG.error("Extractor context is null. Cannot proceed with extraction."); + + instance.exitCode = EXIT_CODE_FAILED; + } + } catch (Exception e) { + LOG.error("Extraction failed", e); + + instance.exitCode = EXIT_CODE_FAILED; + } + + System.exit(instance != null ? instance.exitCode : EXIT_CODE_FAILED); + } + + private void run() { + try { + String cronExpression = extractorContext.getCronExpression(); + + if (StringUtils.isNotEmpty(cronExpression)) { + if (!CronExpression.isValidExpression(cronExpression)) { + LOG.error("Invalid cron expression: {}", cronExpression); + + exitCode = EXIT_CODE_FAILED; + } else { + LOG.info("Scheduling extraction for cron expression: {}", cronExpression); + + JobDetail job = JobBuilder.newJob(MetadataJob.class).withIdentity("metadataJob", "group1").build(); + Trigger trigger = TriggerBuilder.newTrigger() + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow() + .build(); + + Scheduler scheduler = null; + + try { + scheduler = new StdSchedulerFactory().getScheduler(); + + scheduler.scheduleJob(job, trigger); + scheduler.start(); + + try { + Thread.currentThread().join(); + } catch (InterruptedException ie) { + LOG.info("Main thread interrupted, shutting down scheduler."); + + exitCode = EXIT_CODE_SUCCESS; + } + } finally { + try { + if (scheduler != null && !scheduler.isShutdown()) { + scheduler.shutdown(true); + } + } catch (SchedulerException se) { + LOG.warn("Error shutting down scheduler: {}", se.getMessage()); + } + } + } + } else { + LOG.info("Running extraction once"); + + ExtractorService extractorService = new ExtractorService(); + + if (extractorService.execute(extractorContext)) { + exitCode = EXIT_CODE_SUCCESS; + } + } + } catch (Exception e) { + LOG.error("Error encountered. exitCode: {}", exitCode, e); + } finally { + if (extractorContext.getAtlasConnector() != null) { + extractorContext.getAtlasConnector().close(); + } + } + + System.exit(exitCode); + } + + private ExtractorContext createExtractorContext(String[] args) throws AtlasBaseException, IOException { + Options acceptedCliOptions = prepareCommandLineOptions(); + + try { + CommandLine cmd = new BasicParser().parse(acceptedCliOptions, args, true); + List<String> argsNotProcessed = cmd.getArgList(); + + if (argsNotProcessed != null && !argsNotProcessed.isEmpty()) { + throw new AtlasBaseException("Unrecognized arguments."); + } + + ExtractorContext ret = null; + + if (cmd.hasOption(ExtractorContext.OPTION_HELP_SHORT)) { + printUsage(acceptedCliOptions); + exitCode = EXIT_CODE_HELP; + } else { + ret = new ExtractorContext(cmd); + LOG.debug("Successfully initialized the extractor context."); + } + + return ret; + } catch (ParseException | AtlasBaseException e) { + printUsage(acceptedCliOptions); + + throw new AtlasBaseException("Invalid arguments. Reason: " + e.getMessage(), e); + } catch (AtlasException e) { + throw new AtlasBaseException("Error in getting Application Properties. Reason: " + e.getMessage(), e); + } catch (IOException e) { + throw new IOException(e); + } + } + + private static Options prepareCommandLineOptions() { + Options acceptedCliOptions = new Options(); + + return acceptedCliOptions.addOption(OPTION_CATALOG_SHORT, OPTION_CATALOG_LONG, true, "Catalog name") + .addOption(OPTION_SCHEMA_SHORT, OPTION_SCHEMA_LONG, true, "Schema name") + .addOption(OPTION_TABLE_SHORT, OPTION_TABLE_LONG, true, "Table name") + .addOption(OPTION_CRON_EXPRESSION_SHORT, OPTION_CRON_EXPRESSION_LONG, true, "Cron expression to run extraction") + .addOption(OPTION_HELP_SHORT, OPTION_HELP_LONG, false, "Print help message"); + } + + private static void printUsage(Options options) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(TrinoExtractor.class.getName(), options); + } + + @DisallowConcurrentExecution + public static class MetadataJob implements Job { + private final Logger LOG = LoggerFactory.getLogger(MetadataJob.class); + + public void execute(JobExecutionContext context) throws JobExecutionException { + ExtractorContext extractorContext = TrinoExtractor.instance != null ? TrinoExtractor.instance.extractorContext : null; + + if (extractorContext != null) { + LOG.info("Executing metadata extraction at: {}", java.time.LocalTime.now()); + + ExtractorService extractorService = new ExtractorService(); + + try { + if (extractorService.execute(extractorContext)) { + TrinoExtractor.instance.exitCode = EXIT_CODE_SUCCESS; + } + } catch (Exception e) { + LOG.error("Error encountered: ", e); + throw new JobExecutionException(e); + } + LOG.info("Completed executing metadata extraction at: {}", java.time.LocalTime.now()); + } + } + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java new file mode 100644 index 000000000..70e623847 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java @@ -0,0 +1,438 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.client; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.trino.model.Catalog; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.atlas.type.AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME; + +public class AtlasClientHelper { + private static final Logger LOG = LoggerFactory.getLogger(AtlasClientHelper.class); + + public static final String TRINO_INSTANCE = "trino_instance"; + public static final String TRINO_CATALOG = "trino_catalog"; + public static final String TRINO_SCHEMA = "trino_schema"; + public static final String TRINO_TABLE = "trino_table"; + public static final String TRINO_COLUMN = "trino_column"; + public static final String TRINO_INSTANCE_CATALOG_ATTRIBUTE = "catalogs"; + public static final String TRINO_CATALOG_SCHEMA_ATTRIBUTE = "schemas"; + public static final String TRINO_SCHEMA_TABLE_ATTRIBUTE = "tables"; + public static final String QUALIFIED_NAME_ATTRIBUTE = "qualifiedName"; + public static final String NAME_ATTRIBUTE = "name"; + public static final int DEFAULT_PAGE_LIMIT = 10000; + + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + private static final String APPLICATION_PROPERTY_ATLAS_ENDPOINT = "atlas.rest.address"; + private static final String TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE = "connectorType"; + private static final String TRINO_CATALOG_INSTANCE_ATTRIBUTE = "instance"; + private static final String TRINO_CATALOG_INSTANCE_RELATIONSHIP = "trino_instance_catalog"; + private static final String TRINO_SCHEMA_CATALOG_ATTRIBUTE = "catalog"; + private static final String TRINO_SCHEMA_CATALOG_RELATIONSHIP = "trino_schema_catalog"; + private static final String TRINO_COLUMN_DATA_TYPE_ATTRIBUTE = "data_type"; + private static final String TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE = "ordinal_position"; + private static final String TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE = "column_default"; + private static final String TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE = "is_nullable"; + private static final String TRINO_COLUMN_TABLE_ATTRIBUTE = "table"; + private static final String TRINO_TABLE_TYPE = "table_type"; + private static final String TRINO_TABLE_COLUMN_RELATIONSHIP = "trino_table_columns"; + private static final String TRINO_TABLE_SCHEMA_RELATIONSHIP = "trino_table_schema"; + private static final String TRINO_TABLE_SCHEMA_ATTRIBUTE = "trinoschema"; + private static final String TRINO_TABLE_COLUMN_ATTRIBUTE = "columns"; + + private static AtlasClientV2 atlasClientV2; + + public AtlasClientHelper(Configuration atlasConf) throws IOException { + atlasClientV2 = getAtlasClientV2Instance(atlasConf); + } + + public List<AtlasEntityHeader> getAllCatalogsInInstance(String instanceGuid) throws AtlasServiceException { + List<AtlasEntityHeader> entities = getAllRelationshipEntities(instanceGuid, TRINO_INSTANCE_CATALOG_ATTRIBUTE); + + if (CollectionUtils.isNotEmpty(entities)) { + LOG.debug("Retrieved {} catalogs in Trino instance {}", entities.size(), instanceGuid); + } else { + LOG.debug("No catalog found in Trino instance {}", instanceGuid); + } + + return entities; + } + + public List<AtlasEntityHeader> getAllSchemasInCatalog(String catalogGuid) throws AtlasServiceException { + List<AtlasEntityHeader> entities = getAllRelationshipEntities(catalogGuid, TRINO_CATALOG_SCHEMA_ATTRIBUTE); + + if (CollectionUtils.isNotEmpty(entities)) { + LOG.debug("Retrieved {} schemas in Trino catalog {}", entities.size(), catalogGuid); + } else { + LOG.debug("No schema found in Trino catalog {}", catalogGuid); + } + + return entities; + } + + public List<AtlasEntityHeader> getAllTablesInSchema(String schemaGuid) throws AtlasServiceException { + List<AtlasEntityHeader> entities = getAllRelationshipEntities(schemaGuid, TRINO_SCHEMA_TABLE_ATTRIBUTE); + + if (CollectionUtils.isNotEmpty(entities)) { + LOG.debug("Retrieved {} tables in Trino schema {}", entities.size(), schemaGuid); + } else { + LOG.debug("No table found in Trino schema {}", schemaGuid); + } + + return entities; + } + + public AtlasEntityHeader getTrinoInstance(String namespace) { + try { + return atlasClientV2.getEntityHeaderByAttribute(TRINO_INSTANCE, Collections.singletonMap(QUALIFIED_NAME_ATTRIBUTE, namespace)); + } catch (AtlasServiceException e) { + return null; + } + } + + public AtlasEntityWithExtInfo createOrUpdateInstanceEntity(String trinoNamespace) throws AtlasServiceException { + String qualifiedName = trinoNamespace; + AtlasEntityWithExtInfo ret = findEntity(TRINO_INSTANCE, qualifiedName, true, true); + + if (ret == null) { + AtlasEntity entity = new AtlasEntity(TRINO_INSTANCE); + + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName); + entity.setAttribute(NAME_ATTRIBUTE, trinoNamespace); + + ret = createEntity(new AtlasEntityWithExtInfo(entity)); + } + + return ret; + } + + public AtlasEntityWithExtInfo createOrUpdateCatalogEntity(Catalog catalog) throws AtlasServiceException { + String catalogName = catalog.getName(); + String trinoNamespace = catalog.getInstanceName(); + String qualifiedName = String.format("%s@%s", catalogName, trinoNamespace); + + AtlasEntityWithExtInfo ret = findEntity(TRINO_CATALOG, qualifiedName, true, true); + + if (ret == null) { + AtlasEntity entity = new AtlasEntity(TRINO_CATALOG); + + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName); + entity.setAttribute(NAME_ATTRIBUTE, catalogName); + entity.setAttribute(TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE, catalog.getType()); + entity.setRelationshipAttribute(TRINO_CATALOG_INSTANCE_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(catalog.getTrinoInstanceEntity().getEntity(), TRINO_CATALOG_INSTANCE_RELATIONSHIP)); + + if (catalog.getConnector() != null) { + catalog.getConnector().connectTrinoCatalog(this, catalog.getHookInstanceName(), catalogName, entity, ret); + } + + ret = createEntity(new AtlasEntityWithExtInfo(entity)); + } else { + AtlasEntity entity = ret.getEntity(); + + entity.setRelationshipAttribute(TRINO_CATALOG_INSTANCE_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(catalog.getTrinoInstanceEntity().getEntity(), TRINO_CATALOG_INSTANCE_RELATIONSHIP)); + + if (catalog.getConnector() != null) { + catalog.getConnector().connectTrinoCatalog(this, catalog.getHookInstanceName(), catalogName, entity, ret); + } + + ret.setEntity(entity); + + updateEntity(ret); + } + + return ret; + } + + public AtlasEntityWithExtInfo createOrUpdateSchemaEntity(Catalog catalog, AtlasEntity catalogEntity, String schema) throws AtlasServiceException { + String qualifiedName = String.format("%s.%s@%s", catalog.getName(), schema, catalog.getInstanceName()); + AtlasEntityWithExtInfo ret = findEntity(TRINO_SCHEMA, qualifiedName, true, true); + + if (ret == null) { + AtlasEntity entity = new AtlasEntity(TRINO_SCHEMA); + + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName); + entity.setAttribute(NAME_ATTRIBUTE, schema); + entity.setRelationshipAttribute(TRINO_SCHEMA_CATALOG_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(catalogEntity, TRINO_SCHEMA_CATALOG_RELATIONSHIP)); + + if (catalog.getConnector() != null) { + catalog.getConnector().connectTrinoSchema(this, catalog.getHookInstanceName(), catalog.getName(), schema, entity, ret); + } + + ret = createEntity(new AtlasEntityWithExtInfo(entity)); + } else { + AtlasEntity entity = ret.getEntity(); + + entity.setRelationshipAttribute(TRINO_SCHEMA_CATALOG_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(catalogEntity, TRINO_SCHEMA_CATALOG_RELATIONSHIP)); + + if (catalog.getConnector() != null) { + catalog.getConnector().connectTrinoSchema(this, catalog.getHookInstanceName(), catalog.getName(), schema, entity, ret); + } + + ret.setEntity(entity); + + updateEntity(ret); + } + + return ret; + } + + public AtlasEntityWithExtInfo createOrUpdateTableEntity(Catalog catalog, String schema, String table, Map<String, Object> tableMetadata, Map<String, Map<String, Object>> trinoColumns, AtlasEntity schemaEntity) throws AtlasServiceException { + String qualifiedName = String.format("%s.%s.%s@%s", catalog.getName(), schema, table, catalog.getInstanceName()); + + AtlasEntityWithExtInfo ret; + AtlasEntityWithExtInfo tableEntityExt = findEntity(TRINO_TABLE, qualifiedName, true, true); + + if (tableEntityExt == null) { + tableEntityExt = toTableEntity(catalog, schema, table, tableMetadata, trinoColumns, schemaEntity, tableEntityExt); + ret = createEntity(tableEntityExt); + } else { + ret = toTableEntity(catalog, schema, table, tableMetadata, trinoColumns, schemaEntity, tableEntityExt); + + updateEntity(ret); + } + + return ret; + } + + public void deleteByGuid(Set<String> guidTodelete) throws AtlasServiceException { + if (CollectionUtils.isNotEmpty(guidTodelete)) { + for (String guid : guidTodelete) { + EntityMutationResponse response = atlasClientV2.deleteEntityByGuid(guid); + + if (response == null || response.getDeletedEntities().isEmpty()) { + LOG.debug("Entity with guid : {} is not deleted", guid); + } else { + LOG.debug("Entity with guid : {} is deleted", guid); + } + } + } + } + + public AtlasEntityWithExtInfo findEntity(final String typeName, final String qualifiedName, boolean minExtInfo, boolean ignoreRelationship) throws AtlasServiceException { + try { + return atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName), minExtInfo, ignoreRelationship); + } catch (AtlasServiceException e) { + if (e.getStatus() == ClientResponse.Status.NOT_FOUND) { + return null; + } + + throw e; + } + } + + public void close() { + if (atlasClientV2 != null) { + atlasClientV2.close(); + + atlasClientV2 = null; + } + } + + + private synchronized AtlasClientV2 getAtlasClientV2Instance(Configuration atlasConf) throws IOException { + if (atlasClientV2 == null) { + String[] atlasEndpoint = new String[] {DEFAULT_ATLAS_URL}; + + if (atlasConf != null && ArrayUtils.isNotEmpty(atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT))) { + atlasEndpoint = atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT); + } + + if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { + String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); + + atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword); + } else { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint); + } + } + + return atlasClientV2; + } + + private List<AtlasEntityHeader> getAllRelationshipEntities(String entityGuid, String relationshipAttributeName) throws AtlasServiceException { + List<AtlasEntityHeader> entities = new ArrayList<>(); + + if (entityGuid != null) { + final int pageSize = DEFAULT_PAGE_LIMIT; + + for (int i = 0; ; i++) { + int offset = pageSize * i; + + LOG.debug("Retrieving: offset={}, pageSize={}", offset, pageSize); + + AtlasSearchResult searchResult = atlasClientV2.relationshipSearch(entityGuid, relationshipAttributeName, null, null, true, pageSize, offset); + List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities(); + int count = entityHeaders == null ? 0 : entityHeaders.size(); + + if (count > 0) { + entities.addAll(entityHeaders); + } + + if (count < pageSize) { // last page + break; + } + } + } + + return entities; + } + + private AtlasEntityWithExtInfo toTableEntity(Catalog catalog, String schema, String table, Map<String, Object> tableMetadata, Map<String, Map<String, Object>> trinoColumns, AtlasEntity schemaEntity, AtlasEntityWithExtInfo tableEntityExt) { + if (tableEntityExt == null) { + tableEntityExt = new AtlasEntityWithExtInfo(new AtlasEntity(TRINO_TABLE)); + } + + AtlasEntity tableEntity = tableEntityExt.getEntity(); + List<AtlasEntity> columnEntities = new ArrayList<>(); + String qualifiedName = String.format("%s.%s.%s@%s", catalog.getName(), schema, table, catalog.getInstanceName()); + + tableEntity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName); + tableEntity.setAttribute(NAME_ATTRIBUTE, table); + tableEntity.setAttribute(TRINO_TABLE_TYPE, tableMetadata.get(TRINO_TABLE_TYPE).toString()); + + for (Map.Entry<String, Map<String, Object>> columnEntry : trinoColumns.entrySet()) { + AtlasEntity entity = new AtlasEntity(TRINO_COLUMN); + String columnName = columnEntry.getKey(); + String colQualifiedName = String.format("%s.%s.%s.%s@%s", catalog.getName(), schema, table, columnName, catalog.getInstanceName()); + + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, colQualifiedName); + entity.setAttribute(NAME_ATTRIBUTE, columnName); + + if (MapUtils.isNotEmpty(columnEntry.getValue())) { + Map<String, Object> columnAttr = columnEntry.getValue(); + + entity.setAttribute(TRINO_COLUMN_DATA_TYPE_ATTRIBUTE, columnAttr.get(TRINO_COLUMN_DATA_TYPE_ATTRIBUTE)); + entity.setAttribute(TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE, columnAttr.get(TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE)); + entity.setAttribute(TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE, columnAttr.get(TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE)); + entity.setAttribute(TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE, columnAttr.get(TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE)); + } + + entity.setRelationshipAttribute(TRINO_COLUMN_TABLE_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(tableEntity, TRINO_TABLE_COLUMN_RELATIONSHIP)); + + columnEntities.add(entity); + } + + tableEntity.setRelationshipAttribute(TRINO_TABLE_SCHEMA_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(schemaEntity, TRINO_TABLE_SCHEMA_RELATIONSHIP)); + tableEntity.setRelationshipAttribute(TRINO_TABLE_COLUMN_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectIds(columnEntities, TRINO_TABLE_COLUMN_RELATIONSHIP)); + + if (catalog.getConnector() != null) { + catalog.getConnector().connectTrinoTable(this, catalog.getHookInstanceName(), catalog.getName(), schema, table, tableEntity, columnEntities, tableEntityExt); + } + + tableEntityExt.addReferredEntity(schemaEntity); + + for (AtlasEntity column : columnEntities) { + tableEntityExt.addReferredEntity(column); + } + + return tableEntityExt; + } + + private AtlasEntityWithExtInfo createEntity(AtlasEntityWithExtInfo entity) throws AtlasServiceException { + LOG.debug("creating {} entity: {}", entity.getEntity().getTypeName(), entity); + + AtlasEntityWithExtInfo ret = null; + EntityMutationResponse response = atlasClientV2.createEntity(entity); + List<AtlasEntityHeader> createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE); + + if (CollectionUtils.isNotEmpty(createdEntities)) { + for (AtlasEntityHeader createdEntity : createdEntities) { + if (ret == null) { + ret = atlasClientV2.getEntityByGuid(createdEntity.getGuid()); + + LOG.debug("Created {} entity: name={}, guid={}", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid()); + } else if (ret.getEntity(createdEntity.getGuid()) == null) { + AtlasEntityWithExtInfo newEntity = atlasClientV2.getEntityByGuid(createdEntity.getGuid()); + + ret.addReferredEntity(newEntity.getEntity()); + + if (MapUtils.isNotEmpty(newEntity.getReferredEntities())) { + for (Map.Entry<String, AtlasEntity> entry : newEntity.getReferredEntities().entrySet()) { + ret.addReferredEntity(entry.getKey(), entry.getValue()); + } + } + + LOG.debug("Created {} entity: name={}, guid={}", newEntity.getEntity().getTypeName(), newEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), newEntity.getEntity().getGuid()); + } + } + } + + clearRelationshipAttributes(ret); + + return ret; + } + + private void updateEntity(AtlasEntityWithExtInfo entity) throws AtlasServiceException { + LOG.debug("updating {} entity: {}", entity.getEntity().getTypeName(), entity); + + atlasClientV2.updateEntity(entity); + + LOG.debug("Updated {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid()); + } + + private void clearRelationshipAttributes(AtlasEntityWithExtInfo entity) { + if (entity != null) { + clearRelationshipAttributes(entity.getEntity()); + + if (entity.getReferredEntities() != null) { + clearRelationshipAttributes(entity.getReferredEntities().values()); + } + } + } + + private void clearRelationshipAttributes(Collection<AtlasEntity> entities) { + if (entities != null) { + for (AtlasEntity entity : entities) { + clearRelationshipAttributes(entity); + } + } + } + + private void clearRelationshipAttributes(AtlasEntity entity) { + if (entity != null && entity.getRelationshipAttributes() != null) { + entity.getRelationshipAttributes().clear(); + } + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java new file mode 100644 index 000000000..2ef3f1499 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.client; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TrinoClientHelper { + private final String jdbcUrl; + private final String username; + private final String password; + + public TrinoClientHelper(Configuration atlasConf) { + this.jdbcUrl = atlasConf.getString("atlas.trino.jdbc.address"); + this.username = atlasConf.getString("atlas.trino.jdbc.user"); + this.password = atlasConf.getString("atlas.trino.jdbc.password", ""); + } + + public Map<String, String> getAllTrinoCatalogs() { + Map<String, String> catalogs = new HashMap<>(); + + try (Connection connection = getTrinoConnection(); + Statement stmt = connection.createStatement()) { + String query = "SELECT catalog_name, connector_name FROM system.metadata.catalogs"; + ResultSet rs = stmt.executeQuery(query); + + while (rs.next()) { + catalogs.put(rs.getString("catalog_name"), rs.getString("connector_name")); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return catalogs; + } + + public List<String> getTrinoSchemas(String catalog, String schemaToImport) throws SQLException { + List<String> schemas = new ArrayList<>(); + + try (Connection connection = getTrinoConnection(); + Statement stmt = connection.createStatement()) { + StringBuilder query = new StringBuilder(); + + query.append("SELECT schema_name FROM ").append(catalog).append(".information_schema.schemata"); + + if (StringUtils.isNotEmpty(schemaToImport)) { + query.append(" where schema_name = '").append(schemaToImport).append("'"); + } + + ResultSet rs = stmt.executeQuery(query.toString()); + + while (rs.next()) { + schemas.add(rs.getString("schema_name")); + } + } + + return schemas; + } + + public Map<String, Map<String, Object>> getTrinoTables(String catalog, String schema, String tableToImport) throws SQLException { + Map<String, Map<String, Object>> tables = new HashMap<>(); + + try (Connection connection = getTrinoConnection(); + Statement stmt = connection.createStatement()) { + StringBuilder query = new StringBuilder(); + + query.append("SELECT table_name, table_type FROM ").append(catalog).append(".information_schema.tables WHERE table_schema = '").append(schema).append("'"); + + if (StringUtils.isNotEmpty(tableToImport)) { + query.append(" and table_name = '").append(tableToImport).append("'"); + } + + ResultSet rs = stmt.executeQuery(query.toString()); + + while (rs.next()) { + Map<String, Object> tableMetadata = new HashMap<>(); + + tableMetadata.put("table_name", rs.getString("table_name")); + tableMetadata.put("table_type", rs.getString("table_type")); + + tables.put(rs.getString("table_name"), tableMetadata); + } + } + + return tables; + } + + public Map<String, Map<String, Object>> getTrinoColumns(String catalog, String schema, String table) throws SQLException { + Map<String, Map<String, Object>> columns = new HashMap<>(); + + try (Connection connection = getTrinoConnection(); + Statement stmt = connection.createStatement()) { + StringBuilder query = new StringBuilder(); + + query.append("SELECT column_name, ordinal_position, column_default, is_nullable, data_type FROM ").append(catalog).append(".information_schema.columns WHERE table_schema = '").append(schema).append("' AND table_name = '").append(table).append("'"); + + ResultSet rs = stmt.executeQuery(query.toString()); + + while (rs.next()) { + Map<String, Object> columnMetadata = new HashMap<>(); + + columnMetadata.put("ordinal_position", rs.getInt("ordinal_position")); + columnMetadata.put("column_default", rs.getString("column_default")); + columnMetadata.put("column_name", rs.getString("column_name")); + + if (StringUtils.isNotEmpty(rs.getString("is_nullable"))) { + if (StringUtils.equalsIgnoreCase(rs.getString("is_nullable"), "YES")) { + columnMetadata.put("is_nullable", true); + } else { + columnMetadata.put("is_nullable", false); + } + } + + columnMetadata.put("data_type", rs.getString("data_type")); + + columns.put(rs.getString("column_name"), columnMetadata); + } + } + + return columns; + } + + private Connection getTrinoConnection() throws SQLException { + return DriverManager.getConnection(jdbcUrl, username, password); + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java new file mode 100644 index 000000000..eec4a6736 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.connector; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.trino.client.AtlasClientHelper; + +import java.util.List; + +public abstract class AtlasEntityConnector { + public abstract void connectTrinoCatalog(AtlasClientHelper atlasClient, String instanceName, String catalogName, AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo); + + public abstract void connectTrinoSchema(AtlasClientHelper atlasClient, String instanceName, String catalogName, String schemaName, AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo); + + public abstract void connectTrinoTable(AtlasClientHelper atlasClient, String instanceName, String catalogName, String schemaName, String tableName, AtlasEntity entity, List<AtlasEntity> columnEntities, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo); +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java new file mode 100644 index 000000000..f20205e29 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.connector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectorFactory { + private static final Logger LOG = LoggerFactory.getLogger(ConnectorFactory.class); + + public static AtlasEntityConnector getConnector(String connectorType) { + switch (connectorType.toLowerCase()) { + case "iceberg": + return new IcebergEntityConnector(); + case "hive": + return new HiveEntityConnector(); + default: + LOG.warn("{}: unrecognized connector type", connectorType); + + return null; + } + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java new file mode 100644 index 000000000..df54d05d4 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.connector; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.type.AtlasTypeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class HiveEntityConnector extends AtlasEntityConnector { + private static final Logger LOG = LoggerFactory.getLogger(HiveEntityConnector.class); + + public static final String HIVE_DB = "hive_db"; + public static final String HIVE_TABLE = "hive_table"; + public static final String HIVE_COLUMN = "hive_column"; + public static final String TRINO_SCHEMA_HIVE_DB_RELATIONSHIP = "trino_schema_hive_db"; + public static final String TRINO_TABLE_HIVE_TABLE_RELATIONSHIP = "trino_table_hive_table"; + public static final String TRINO_COLUMN_HIVE_COLUMN_RELATIONSHIP = "trino_column_hive_column"; + public static final String TRINO_SCHEMA_HIVE_DB_ATTRIBUTE = "hive_db"; + public static final String TRINO_TABLE_HIVE_TABLE_ATTRIBUTE = "hive_table"; + public static final String TRINO_COLUMN_HIVE_COLUMN_ATTRIBUTE = "hive_column"; + + @Override + public void connectTrinoCatalog(AtlasClientHelper atlasClient, String instanceName, String catalogName, AtlasEntity entity, AtlasEntityWithExtInfo entityWithExtInfo) { + } + + @Override + public void connectTrinoSchema(AtlasClientHelper atlasClient, String instanceName, String catalogName, String schemaName, AtlasEntity dbEntity, AtlasEntityWithExtInfo entityWithExtInfo) { + if (instanceName == null) { + LOG.warn("Failed attempting to connect entity since hook namespace is empty, Please configure in properties"); + } else { + try { + AtlasEntity hiveDb = toDbEntity(atlasClient, instanceName, schemaName); + + if (hiveDb != null) { + dbEntity.setRelationshipAttribute(TRINO_SCHEMA_HIVE_DB_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(hiveDb, TRINO_SCHEMA_HIVE_DB_RELATIONSHIP)); + } + } catch (AtlasServiceException e) { + LOG.error("Error encountered: ", e); + } + } + } + + @Override + public void connectTrinoTable(AtlasClientHelper atlasClient, String instanceName, String catalogName, String schemaName, String tableName, AtlasEntity trinoTable, List<AtlasEntity> columnEntities, AtlasEntityWithExtInfo entityWithExtInfo) { + if (instanceName == null) { + LOG.warn("Failed attempting to connect entity since hook namespace is empty, Please configure in properties"); + } else { + try { + AtlasEntity hiveTable = toTableEntity(atlasClient, instanceName, schemaName, tableName); + + if (hiveTable != null) { + trinoTable.setRelationshipAttribute(TRINO_TABLE_HIVE_TABLE_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(hiveTable, TRINO_TABLE_HIVE_TABLE_RELATIONSHIP)); + + for (AtlasEntity columnEntity : columnEntities) { + connectTrinoColumn(atlasClient, instanceName, schemaName, tableName, columnEntity); + } + } + } catch (AtlasServiceException e) { + LOG.error("Error encountered: ", e); + } + } + } + + private void connectTrinoColumn(AtlasClientHelper atlasClient, String instanceName, String schemaName, String tableName, AtlasEntity trinoColumn) throws AtlasServiceException { + if (instanceName != null) { + try { + AtlasEntity hiveColumn = toColumnEntity(atlasClient, instanceName, schemaName, tableName, trinoColumn.getAttribute("name").toString()); + + if (hiveColumn != null) { + trinoColumn.setRelationshipAttribute(TRINO_COLUMN_HIVE_COLUMN_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(hiveColumn, TRINO_COLUMN_HIVE_COLUMN_RELATIONSHIP)); + } + } catch (AtlasServiceException e) { + throw new AtlasServiceException(e); + } + } + } + + private AtlasEntity toDbEntity(AtlasClientHelper atlasClient, String instanceName, String schemaName) throws AtlasServiceException { + String dbQualifiedName = schemaName + "@" + instanceName; + AtlasEntityWithExtInfo ret = atlasClient.findEntity(HIVE_DB, dbQualifiedName, true, true); + + return ret != null ? ret.getEntity() : null; + } + + private AtlasEntity toTableEntity(AtlasClientHelper atlasClient, String instanceName, String schemaName, String tableName) throws AtlasServiceException { + String tableQualifiedName = schemaName + "." + tableName + "@" + instanceName; + AtlasEntityWithExtInfo ret = atlasClient.findEntity(HIVE_TABLE, tableQualifiedName, true, true); + + return ret != null ? ret.getEntity() : null; + } + + private AtlasEntity toColumnEntity(AtlasClientHelper atlasClient, String instanceName, String schemaName, String tableName, String columnName) throws AtlasServiceException { + String columnQualifiedName = schemaName + "." + tableName + "." + columnName + "@" + instanceName; + AtlasEntityWithExtInfo ret = atlasClient.findEntity(HIVE_COLUMN, columnQualifiedName, true, true); + + return ret != null ? ret.getEntity() : null; + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java new file mode 100644 index 000000000..677c7834b --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.connector; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.type.AtlasTypeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class IcebergEntityConnector extends AtlasEntityConnector { + private static final Logger LOG = LoggerFactory.getLogger(IcebergEntityConnector.class); + + public static final String HIVE_DB = "hive_db"; + public static final String ICEBERG_TABLE = "iceberg_table"; + public static final String ICEBERG_COLUMN = "iceberg_column"; + public static final String TRINO_SCHEMA_HIVE_DB_RELATIONSHIP = "trino_schema_hive_db"; + public static final String TRINO_TABLE_ICEBERG_TABLE_RELATIONSHIP = "trino_table_iceberg_table"; + public static final String TRINO_COLUMN_ICEBERG_COLUMN_RELATIONSHIP = "trino_column_iceberg_column"; + public static final String TRINO_SCHEMA_HIVE_DB_ATTRIBUTE = "hive_db"; + public static final String TRINO_TABLE_ICEBERG_TABLE_ATTRIBUTE = "iceberg_table"; + public static final String TRINO_COLUMN_ICEBERG_COLUMN_ATTRIBUTE = "iceberg_column"; + + @Override + public void connectTrinoCatalog(AtlasClientHelper atlasClient, String instanceName, String catalogName, AtlasEntity entity, AtlasEntityWithExtInfo entityWithExtInfo) { + } + + @Override + public void connectTrinoSchema(AtlasClientHelper atlasClient, String instanceName, String catalogName, String schemaName, AtlasEntity dbEntity, AtlasEntityWithExtInfo entityWithExtInfo) { + if (instanceName == null) { + LOG.warn("Failed attempting to connect entity since hook namespace is empty, Please configure in properties"); + } else { + try { + AtlasEntity hiveDb = toDbEntity(atlasClient, instanceName, schemaName); + + if (hiveDb != null) { + dbEntity.setRelationshipAttribute(TRINO_SCHEMA_HIVE_DB_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(hiveDb, TRINO_SCHEMA_HIVE_DB_RELATIONSHIP)); + } + } catch (AtlasServiceException e) { + LOG.error("Error encountered: ", e); + } + } + } + + @Override + public void connectTrinoTable(AtlasClientHelper atlasClient, String instanceName, String catalogName, String schemaName, String tableName, AtlasEntity trinoTable, List<AtlasEntity> columnEntities, AtlasEntityWithExtInfo entityWithExtInfo) { + if (instanceName == null) { + LOG.warn("Failed attempting to connect entity since hook namespace is empty, Please configure in properties"); + } else { + try { + AtlasEntity icebergTable = toTableEntity(atlasClient, instanceName, schemaName, tableName); + + if (icebergTable != null) { + trinoTable.setRelationshipAttribute(TRINO_TABLE_ICEBERG_TABLE_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(icebergTable, TRINO_TABLE_ICEBERG_TABLE_RELATIONSHIP)); + + for (AtlasEntity columnEntity : columnEntities) { + connectTrinoColumn(atlasClient, instanceName, schemaName, tableName, columnEntity); + } + } + } catch (AtlasServiceException e) { + LOG.error("Error encountered: ", e); + } + } + } + + private void connectTrinoColumn(AtlasClientHelper atlasClient, String instanceName, String schemaName, String tableName, AtlasEntity trinoColumn) throws AtlasServiceException { + if (instanceName != null) { + try { + AtlasEntity icebergColumn = toColumnEntity(atlasClient, instanceName, schemaName, tableName, trinoColumn.getAttribute("name").toString()); + + if (icebergColumn != null) { + trinoColumn.setRelationshipAttribute(TRINO_COLUMN_ICEBERG_COLUMN_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(icebergColumn, TRINO_COLUMN_ICEBERG_COLUMN_RELATIONSHIP)); + } + } catch (AtlasServiceException e) { + throw new AtlasServiceException(e); + } + } + } + + private AtlasEntity toDbEntity(AtlasClientHelper atlasClient, String instanceName, String schemaName) throws AtlasServiceException { + String dbQualifiedName = schemaName + "@" + instanceName; + AtlasEntityWithExtInfo ret = atlasClient.findEntity(HIVE_DB, dbQualifiedName, true, true); + + return ret != null ? ret.getEntity() : null; + } + + private AtlasEntity toTableEntity(AtlasClientHelper atlasClient, String instanceName, String schemaName, String tableName) throws AtlasServiceException { + String tableQualifiedName = schemaName + "." + tableName + "@" + instanceName; + AtlasEntityWithExtInfo ret = atlasClient.findEntity(ICEBERG_TABLE, tableQualifiedName, true, true); + + return ret != null ? ret.getEntity() : null; + } + + private AtlasEntity toColumnEntity(AtlasClientHelper atlasClient, String instanceName, String schemaName, String tableName, String columnName) throws AtlasServiceException { + String columnQualifiedName = schemaName + "." + tableName + "." + columnName + "@" + instanceName; + AtlasEntityWithExtInfo ret = atlasClient.findEntity(ICEBERG_COLUMN, columnQualifiedName, true, true); + + return ret != null ? ret.getEntity() : null; + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java new file mode 100644 index 000000000..06bba2f48 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.model; + +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.trino.connector.AtlasEntityConnector; +import org.apache.atlas.trino.connector.ConnectorFactory; + +public class Catalog { + private final String name; + private final String type; + private final String hookInstanceName; + private final String instanceName; + private final AtlasEntityConnector connector; + private AtlasEntityWithExtInfo trinoInstanceEntity; + private String schemaToImport; + private String tableToImport; + + public Catalog(String name, String type, boolean hookEnabled, String hookInstanceName, String instanceName) { + this.name = name; + this.type = type; + this.hookInstanceName = hookInstanceName; + this.instanceName = instanceName; + this.connector = hookEnabled ? ConnectorFactory.getConnector(type) : null; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public AtlasEntityConnector getConnector() { + return connector; + } + + public String getHookInstanceName() { + return hookInstanceName; + } + + public String getInstanceName() { + return instanceName; + } + + public AtlasEntityWithExtInfo getTrinoInstanceEntity() { + return trinoInstanceEntity; + } + + public void setTrinoInstanceEntity(AtlasEntityWithExtInfo trinoInstanceEntity) { + this.trinoInstanceEntity = trinoInstanceEntity; + } + + public String getTableToImport() { + return tableToImport; + } + + public void setTableToImport(String tableToImport) { + this.tableToImport = tableToImport; + } + + public String getSchemaToImport() { + return schemaToImport; + } + + public void setSchemaToImport(String schemaToImport) { + this.schemaToImport = schemaToImport; + } +} diff --git a/addons/trino-extractor/src/main/resources/atlas-logback.xml b/addons/trino-extractor/src/main/resources/atlas-logback.xml new file mode 100644 index 000000000..2a616c524 --- /dev/null +++ b/addons/trino-extractor/src/main/resources/atlas-logback.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> + <param name="Target" value="System.out"/> + <encoder> + <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + </appender> + + <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${atlas.log.dir}/${atlas.log.file}</file> + <append>true</append> + <encoder> + <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> + </encoder> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${atlas.log.dir}/${atlas.log.file}-%d</fileNamePattern> + <maxHistory>20</maxHistory> + <cleanHistoryOnStart>true</cleanHistoryOnStart> + </rollingPolicy> + </appender> + + <logger name="org.apache.atlas.trino" additivity="false" level="info"> + <appender-ref ref="FILE"/> + </logger> + + <root level="warn"> + <appender-ref ref="FILE"/> + </root> +</configuration> diff --git a/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java b/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java new file mode 100644 index 000000000..7e94f381b --- /dev/null +++ b/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +public class TrinoExtractorIT { + + /* List of testcases + Invalid Arguments + Invalid cron expression + Test valid Catalog to be run + Test Instance creation + Test catalog creation + Test schema creation + Test table creation + Test of hook is enabled, hook entity if created, is connected to Trino entity + Test cron doesn't trigger new job, before earlier thread completes + Test without cron expression + Test even if catalog is not registered, it should run if passed from commandLine + Deleted table + Deleted catalog + Deleted column + Deleted schema + Rename catalog + Rename schema + Tag propagated*/ + +} diff --git a/distro/pom.xml b/distro/pom.xml index af0d01060..d7e958889 100644 --- a/distro/pom.xml +++ b/distro/pom.xml @@ -276,6 +276,7 @@ atlas.graph.storage.hbase.regions-per-server=1 <!--<descriptor>src/main/assemblies/migration-exporter.xml</descriptor>--> <descriptor>src/main/assemblies/classification-updater.xml</descriptor> <descriptor>src/main/assemblies/notification-analyzer.xml</descriptor> + <descriptor>src/main/assemblies/atlas-trino-extractor.xml</descriptor> </descriptors> <finalName>apache-atlas-${project.version}</finalName> <tarLongFileMode>gnu</tarLongFileMode> diff --git a/distro/src/main/assemblies/atlas-trino-extractor.xml b/distro/src/main/assemblies/atlas-trino-extractor.xml new file mode 100644 index 000000000..923c8dc95 --- /dev/null +++ b/distro/src/main/assemblies/atlas-trino-extractor.xml @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <formats> + <format>tar.gz</format> + </formats> + <id>trino-extractor</id> + <baseDirectory>apache-atlas-trino-extractor-${project.version}</baseDirectory> + <fileSets> + <fileSet> + <includes> + <include>README*</include> + </includes> + </fileSet> + <fileSet> + <directory>../addons/trino-extractor/src/main/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>atlas-log4j.xml</include> + <include>atlas-application.properties</include> + </includes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + <fileSet> + <directory>../addons/trino-extractor/src/main/bin</directory> + <outputDirectory>/bin</outputDirectory> + <includes> + <include>run-trino-extractor.sh</include> + </includes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + <fileSet> + <directory>../addons/trino-extractor/target/dependency/trino</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>../addons/trino-extractor/target</directory> + <outputDirectory>/lib</outputDirectory> + <includes> + <include>atlas-trino-extractor-*.jar</include> + </includes> + </fileSet> + </fileSets> + +</assembly> diff --git a/pom.xml b/pom.xml index 9341140d4..ac2d4df6c 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ <module>addons/sqoop-bridge-shim</module> <module>addons/storm-bridge</module> <module>addons/storm-bridge-shim</module> + <module>addons/trino-extractor</module> <module>atlas-examples</module> <module>authorization</module> <module>build-tools</module>
