This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch ATLAS-5021 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 5802e7c148b2e9ef1f1af949e40326b6a059be87 Author: Pinal Shah <[email protected]> AuthorDate: Tue Jul 22 10:29:40 2025 +0530 ATLAS-5021: Extract Metadata from Trino periodically --- addons/models/6000-Trino/6000-trino_model.json | 28 +++-- addons/trino-extractor/pom.xml | 10 +- .../apache/atlas/trino/cli/ExtractorService.java | 10 +- .../org/apache/atlas/trino/cli/TrinoExtractor.java | 46 +++++--- .../atlas/trino/client/AtlasClientHelper.java | 10 +- .../atlas/trino/client/TrinoClientHelper.java | 18 +-- .../trino/connector/IcebergEntityConnector.java | 124 +++++++++++++++++++++ .../src/main/resources/atlas-logback.xml | 51 +++++++++ 8 files changed, 251 insertions(+), 46 deletions(-) diff --git a/addons/models/6000-Trino/6000-trino_model.json b/addons/models/6000-Trino/6000-trino_model.json index 66dee5e1e..1ab71c42d 100644 --- a/addons/models/6000-Trino/6000-trino_model.json +++ b/addons/models/6000-Trino/6000-trino_model.json @@ -186,14 +186,6 @@ "serviceType": "trino", "typeVersion" : "1.0", "attributeDefs" : [ - { - "name": "depenendencyType", - "typeName": "string", - "cardinality" : "SINGLE", - "isIndexable": false, - "isOptional": false, - "isUnique": false - }, { "name": "expression", "typeName": "string", @@ -386,6 +378,26 @@ "cardinality": "SINGLE" }, "propagateTags": "NONE" + }, + { + "name": "trino_process_column_lineage", + "serviceType": "trino", + "typeVersion": "1.0", + "relationshipCategory": "COMPOSITION", + "endDef1": { + "type": "trino_column_lineage", + "name": "query", + "isContainer": false, + "cardinality": "SINGLE", + "isLegacyAttribute": true + }, + "endDef2": { + "type": "trino_process", + "name": "columnLineages", + "isContainer": true, + "cardinality": "SET" + }, + "propagateTags": "NONE" }, { "name": "trino_schema_hive_db", diff --git a/addons/trino-extractor/pom.xml b/addons/trino-extractor/pom.xml index 7ef285c7e..fe6dc8293 100644 --- a/addons/trino-extractor/pom.xml +++ b/addons/trino-extractor/pom.xml @@ -33,6 +33,10 @@ <description>Apache Atlas Trino Bridge Module</description> <dependencies> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </dependency> <dependency> <groupId>com.sun.jersey</groupId> <artifactId>jersey-client</artifactId> @@ -58,12 +62,6 @@ <artifactId>quartz</artifactId> <version>2.3.2</version> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.7.30</version> - </dependency> - </dependencies> <build> <plugins> diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java index ca64c0d0c..e6f4d6ca8 100644 --- a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java @@ -166,13 +166,13 @@ public class ExtractorService { try { AtlasEntity.AtlasEntityWithExtInfo schemaEntity = AtlasClientHelper.createOrUpdateSchemaEntity(catalog, trinoCatalogEntity, schemaName); - List<String> tables = trinoClientHelper.getTrinoTables(catalog.getName(), schemaName, catalog.getTableToImport()); + Map<String, Map<String, Object>> tables = trinoClientHelper.getTrinoTables(catalog.getName(), schemaName, catalog.getTableToImport()); LOG.info("Found {} tables under {}.{} catalog.schema", tables.size(), catalog.getName(), schemaName); processTables(catalog, schemaName, schemaEntity.getEntity(), tables); if (StringUtils.isEmpty(context.getTable())) { - deleteTables(tables, schemaEntity.getEntity().getGuid()); + deleteTables(new ArrayList<>(tables.keySet()), schemaEntity.getEntity().getGuid()); } } catch (Exception e) { LOG.error("Error processing schema: {}", schemaName); @@ -180,15 +180,15 @@ public class ExtractorService { } } - public void processTables(Catalog catalog, String schemaName, AtlasEntity schemaEntity, List<String> trinoTables) { - for (String trinoTableName : trinoTables) { + public void processTables(Catalog catalog, String schemaName, AtlasEntity schemaEntity, Map<String, Map<String, Object>> trinoTables) { + for (String trinoTableName : trinoTables.keySet()) { LOG.info("Started extracting {} table:", trinoTableName); try { Map<String, Map<String, Object>> trinoColumns = trinoClientHelper.getTrinoColumns(catalog.getName(), schemaName, trinoTableName); LOG.info("Found {} columns under {}.{}.{} catalog.schema.table", trinoColumns.size(), catalog.getName(), schemaName, trinoTableName); - AtlasClientHelper.createOrUpdateTableEntity(catalog, schemaName, trinoTableName, trinoColumns, schemaEntity); + AtlasClientHelper.createOrUpdateTableEntity(catalog, schemaName, trinoTableName, trinoTables.get(trinoTableName), trinoColumns, schemaEntity); } catch (Exception e) { LOG.error("Error processing table: {}", trinoTableName, e); } diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java index 4a0ebe085..eac5637c8 100644 --- a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java @@ -35,6 +35,7 @@ import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.Scheduler; +import org.quartz.SchedulerException; import org.quartz.SchedulerFactory; import org.quartz.Trigger; import org.quartz.TriggerBuilder; @@ -67,28 +68,36 @@ public class TrinoExtractor { private static ExtractorContext extractorContext; public static void main(String[] args) { + Scheduler scheduler = null; try { extractorContext = createExtractorContext(args); if (extractorContext != null) { String cronExpression = extractorContext.getCronExpression(); if (StringUtils.isNotEmpty(cronExpression)) { - if (!CronExpression.isValidExpression(cronExpression)) { LOG.error("Invalid cron expression provided: {}", cronExpression); - } else { - LOG.info("Cron Expression found, scheduling the job for {}", cronExpression); - SchedulerFactory sf = new StdSchedulerFactory(); - Scheduler scheduler = sf.getScheduler(); - - JobDetail job = JobBuilder.newJob(MetadataJob.class).withIdentity("metadataJob", "group1").build(); - Trigger trigger = TriggerBuilder.newTrigger() - .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow() - .build(); - - scheduler.scheduleJob(job, trigger); - scheduler.start(); + exitCode = EXIT_CODE_FAILED; + System.exit(exitCode); + } + + LOG.info("Cron Expression found, scheduling the job for {}", cronExpression); + SchedulerFactory sf = new StdSchedulerFactory(); + scheduler = sf.getScheduler(); + + JobDetail job = JobBuilder.newJob(MetadataJob.class).withIdentity("metadataJob", "group1").build(); + Trigger trigger = TriggerBuilder.newTrigger() + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow() + .build(); + + scheduler.scheduleJob(job, trigger); + scheduler.start(); + try { Thread.currentThread().join(); + } catch (InterruptedException ie) { + LOG.info("Main thread interrupted, shutting down scheduler."); + scheduler.shutdown(true); + exitCode = EXIT_CODE_SUCCESS; } } else { LOG.warn("Cron Expression missing, hence will run the job once"); @@ -99,13 +108,18 @@ public class TrinoExtractor { } } } catch (Exception e) { - LOG.error("Error encountered.", e); - System.out.println("exitCode : " + exitCode); - System.out.println("Error encountered." + e); + LOG.error("Error encountered. exitCode: {}", exitCode, e); } finally { if (extractorContext != null && extractorContext.getAtlasConnector() != null) { extractorContext.getAtlasConnector().close(); } + try { + if (scheduler != null && !scheduler.isShutdown()) { + scheduler.shutdown(true); + } + } catch (SchedulerException se) { + LOG.warn("Error shutting down scheduler: {}", se.getMessage()); + } } System.exit(exitCode); diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java index 5ff2ff2a9..1b597fd0f 100644 --- a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java @@ -72,6 +72,7 @@ public class AtlasClientHelper { private static final String TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE = "column_default"; private static final String TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE = "is_nullable"; private static final String TRINO_COLUMN_TABLE_ATTRIBUTE = "table"; + private static final String TRINO_TABLE_TYPE = "table_type"; private static final String TRINO_TABLE_COLUMN_RELATIONSHIP = "trino_table_columns"; private static final String TRINO_TABLE_SCHEMA_RELATIONSHIP = "trino_table_schema"; private static final String TRINO_TABLE_SCHEMA_ATTRIBUTE = "trinoschema"; @@ -259,24 +260,24 @@ public class AtlasClientHelper { return ret; } - public static AtlasEntity.AtlasEntityWithExtInfo createOrUpdateTableEntity(Catalog catalog, String schema, String table, Map<String, Map<String, Object>> trinoColumns, AtlasEntity schemaEntity) throws AtlasServiceException { + public static AtlasEntity.AtlasEntityWithExtInfo createOrUpdateTableEntity(Catalog catalog, String schema, String table, Map<String, Object> tableMetadata, Map<String, Map<String, Object>> trinoColumns, AtlasEntity schemaEntity) throws AtlasServiceException { String qualifiedName = String.format("%s.%s.%s@%s", catalog.getName(), schema, table, catalog.getInstanceName()); AtlasEntity.AtlasEntityWithExtInfo ret; AtlasEntity.AtlasEntityWithExtInfo tableEntityExt = findEntity(TRINO_TABLE, qualifiedName, true, true); if (tableEntityExt == null) { - tableEntityExt = toTableEntity(catalog, schema, table, trinoColumns, schemaEntity, tableEntityExt); + tableEntityExt = toTableEntity(catalog, schema, table, tableMetadata, trinoColumns, schemaEntity, tableEntityExt); ret = createEntity(tableEntityExt); } else { - ret = toTableEntity(catalog, schema, table, trinoColumns, schemaEntity, tableEntityExt); + ret = toTableEntity(catalog, schema, table, tableMetadata, trinoColumns, schemaEntity, tableEntityExt); updateEntity(ret); } return ret; } - public static AtlasEntity.AtlasEntityWithExtInfo toTableEntity(Catalog catalog, String schema, String table, Map<String, Map<String, Object>> trinoColumns, AtlasEntity schemaEntity, AtlasEntity.AtlasEntityWithExtInfo tableEntityExt) { + public static AtlasEntity.AtlasEntityWithExtInfo toTableEntity(Catalog catalog, String schema, String table, Map<String, Object> tableMetadata, Map<String, Map<String, Object>> trinoColumns, AtlasEntity schemaEntity, AtlasEntity.AtlasEntityWithExtInfo tableEntityExt) { if (tableEntityExt == null) { tableEntityExt = new AtlasEntity.AtlasEntityWithExtInfo(new AtlasEntity(TRINO_TABLE)); } @@ -286,6 +287,7 @@ public class AtlasClientHelper { AtlasEntity tableEntity = tableEntityExt.getEntity(); tableEntity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName); tableEntity.setAttribute(NAME_ATTRIBUTE, table); + tableEntity.setAttribute(TRINO_TABLE_TYPE, tableMetadata.get(TRINO_TABLE_TYPE).toString()); List<AtlasEntity> columnEntities = new ArrayList<>(); for (Map.Entry<String, Map<String, Object>> columnEntry : trinoColumns.entrySet()) { diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java index ef24cb6e4..bc794dae6 100644 --- a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java @@ -84,19 +84,23 @@ public class TrinoClientHelper { return schemas; } - public List<String> getTrinoTables(String catalog, String schema, String tableToImport) throws SQLException { - List<String> tables = new ArrayList<>(); - Connection connection = getTrinoConnection(); - Statement stmt = connection.createStatement(); - StringBuilder query = new StringBuilder(); - query.append("SELECT table_name FROM " + catalog + ".information_schema.tables WHERE table_schema = '" + schema + "'"); + public Map<String, Map<String, Object>> getTrinoTables(String catalog, String schema, String tableToImport) throws SQLException { + Map<String, Map<String, Object>> tables = new HashMap<>(); + Connection connection = getTrinoConnection(); + Statement stmt = connection.createStatement(); + StringBuilder query = new StringBuilder(); + query.append("SELECT table_name, table_type FROM " + catalog + ".information_schema.tables WHERE table_schema = '" + schema + "'"); if (StringUtils.isNotEmpty(tableToImport)) { query.append(" and table_name = '" + tableToImport + "'"); } ResultSet rs = stmt.executeQuery(query.toString()); while (rs.next()) { - tables.add(rs.getString("table_name")); + Map<String, Object> tableMetadata = new HashMap<>(); + tableMetadata.put("table_name", rs.getString("table_name")); + tableMetadata.put("table_type", rs.getString("table_type")); + + tables.put(rs.getString("table_name"), tableMetadata); } return tables; diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java new file mode 100644 index 000000000..bba2052b8 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/IcebergEntityConnector.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.connector; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.type.AtlasTypeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class IcebergEntityConnector extends AtlasEntityConnector { + private static final Logger LOG = LoggerFactory.getLogger(IcebergEntityConnector.class); + + public static final String HIVE_INSTANCE = "hms_instance"; + public static final String HIVE_DB = "hive_db"; + public static final String HIVE_TABLE = "hive_table"; + public static final String HIVE_COLUMN = "hive_column"; + public static final String TRINO_SCHEMA_HIVE_DB_RELATIONSHIP = "trino_schema_hive_db"; + public static final String TRINO_TABLE_HIVE_TABLE_RELATIONSHIP = "trino_table_hive_table"; + public static final String TRINO_COLUMN_HIVE_COLUMN_RELATIONSHIP = "trino_column_hive_column"; + public static final String TRINO_SCHEMA_HIVE_DB_ATTRIBUTE = "hive_db"; + public static final String TRINO_TABLE_HIVE_TABLE_ATTRIBUTE = "hive_table"; + public static final String TRINO_COLUMN_HIVE_COLUMN_ATTRIBUTE = "hive_column"; + @Override + public void connectTrinoCatalog(String instanceName, String catalogName, AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + + } + + @Override + public void connectTrinoSchema(String instanceName, String catalogName, String schemaName, AtlasEntity dbEntity, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + if (instanceName == null) { + LOG.warn("Failed attempting to connect entity since hook namespace is empty, Please configure in properties"); + return; + } + + AtlasEntity hiveDb = null; + try { + hiveDb = toDbEntity(instanceName, schemaName); + } catch (AtlasServiceException e) { + LOG.error("Error encountered: ", e); + } + + if (hiveDb != null) { + dbEntity.setRelationshipAttribute(TRINO_SCHEMA_HIVE_DB_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(hiveDb, TRINO_SCHEMA_HIVE_DB_RELATIONSHIP)); + } + } + + @Override + public void connectTrinoTable(String instanceName, String catalogName, String schemaName, String tableName, AtlasEntity trinoTable, List<AtlasEntity> columnEntities, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + if (instanceName == null) { + LOG.warn("Failed attempting to entity since hook namespace is empty, Please configure in properties"); + return; + } + + AtlasEntity hiveTable; + try { + hiveTable = toTableEntity(instanceName, schemaName, tableName); + + if (hiveTable != null) { + trinoTable.setRelationshipAttribute(TRINO_TABLE_HIVE_TABLE_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(hiveTable, TRINO_TABLE_HIVE_TABLE_RELATIONSHIP)); + + for (AtlasEntity columnEntity : columnEntities) { + connectTrinoColumn(instanceName, schemaName, tableName, columnEntity); + } + } + } catch (AtlasServiceException e) { + LOG.error("Error encountered: ", e); + } + } + + public void connectTrinoColumn(String instanceName, String schemaName, String tableName, AtlasEntity trinoColumn) throws AtlasServiceException { + if (instanceName == null) { + return; + } + + AtlasEntity hiveColumn; + try { + hiveColumn = toColumnEntity(instanceName, schemaName, tableName, trinoColumn.getAttribute("name").toString()); + } catch (AtlasServiceException e) { throw new AtlasServiceException(e); + } + if (hiveColumn != null) { + trinoColumn.setRelationshipAttribute(TRINO_COLUMN_HIVE_COLUMN_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(hiveColumn, TRINO_COLUMN_HIVE_COLUMN_RELATIONSHIP)); + } + } + + private AtlasEntity toDbEntity(String instanceName, String schemaName) throws AtlasServiceException { + String dbQualifiedName = schemaName + "@" + instanceName; + AtlasEntity.AtlasEntityWithExtInfo ret = AtlasClientHelper.findEntity(HIVE_DB, dbQualifiedName, true, true); + + return ret != null ? ret.getEntity() : null; + } + + private AtlasEntity toTableEntity(String instanceName, String schemaName, String tableName) throws AtlasServiceException { + String tableQualifiedName = schemaName + "." + tableName + "@" + instanceName; + AtlasEntity.AtlasEntityWithExtInfo ret = AtlasClientHelper.findEntity(HIVE_TABLE, tableQualifiedName, true, true); + + return ret != null ? ret.getEntity() : null; + } + + private AtlasEntity toColumnEntity(String instanceName, String schemaName, String tableName, String columnName) throws AtlasServiceException { + String columnQualifiedName = schemaName + "." + tableName + "." + columnName + "@" + instanceName; + AtlasEntity.AtlasEntityWithExtInfo ret = AtlasClientHelper.findEntity(HIVE_COLUMN, columnQualifiedName, true, true); + + return ret != null ? ret.getEntity() : null; + } +} diff --git a/addons/trino-extractor/src/main/resources/atlas-logback.xml b/addons/trino-extractor/src/main/resources/atlas-logback.xml new file mode 100644 index 000000000..2a616c524 --- /dev/null +++ b/addons/trino-extractor/src/main/resources/atlas-logback.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> + <param name="Target" value="System.out"/> + <encoder> + <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + </appender> + + <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>${atlas.log.dir}/${atlas.log.file}</file> + <append>true</append> + <encoder> + <pattern>%date [%thread] %level{5} [%file:%line] %msg%n</pattern> + </encoder> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${atlas.log.dir}/${atlas.log.file}-%d</fileNamePattern> + <maxHistory>20</maxHistory> + <cleanHistoryOnStart>true</cleanHistoryOnStart> + </rollingPolicy> + </appender> + + <logger name="org.apache.atlas.trino" additivity="false" level="info"> + <appender-ref ref="FILE"/> + </logger> + + <root level="warn"> + <appender-ref ref="FILE"/> + </root> +</configuration>
