This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 8b50ac0 ATLAS-3836 Add Apache Ozone support in hive hook
8b50ac0 is described below
commit 8b50ac0c7d8eba8efa0deb97f1fee68d382f11ad
Author: Nikhil Bonte <[email protected]>
AuthorDate: Fri Jun 26 12:20:09 2020 +0530
ATLAS-3836 Add Apache Ozone support in hive hook
Signed-off-by: Sarath Subramanian <[email protected]>
---
.../atlas/hive/hook/AtlasHiveHookContext.java | 5 +-
.../java/org/apache/atlas/hive/hook/HiveHook.java | 8 +-
.../atlas/hive/hook/events/BaseHiveEvent.java | 309 +------------
.../apache/atlas/utils/AtlasPathExtractorUtil.java | 493 +++++++++++++++++++++
.../apache/atlas/utils/PathExtractorContext.java | 74 ++++
.../atlas/utils/AtlasPathExtractorUtilTest.java | 176 ++++++++
6 files changed, 763 insertions(+), 302 deletions(-)
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
index d0b9393..1286471 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java
@@ -157,6 +157,7 @@ public class AtlasHiveHookContext {
public Collection<AtlasEntity> getEntities() { return
qNameEntityMap.values(); }
+ public Map<String, AtlasEntity> getQNameToEntityMap() { return
qNameEntityMap; }
public String getMetadataNamespace() {
return hook.getMetadataNamespace();
@@ -168,8 +169,8 @@ public class AtlasHiveHookContext {
return hook.isConvertHdfsPathToLowerCase();
}
- public boolean isAwsS3AtlasModelVersionV2() {
- return hook.isAwsS3AtlasModelVersionV2();
+ public String getAwsS3AtlasModelVersion() {
+ return hook.getAwsS3AtlasModelVersion();
}
public boolean getSkipHiveColumnLineageHive20633() {
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 3aa5c3b..6513234 100644
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -77,7 +77,7 @@ public class HiveHook extends AtlasHook implements
ExecuteWithHookContext {
private static final int nameCacheDatabaseMaxCount;
private static final int nameCacheTableMaxCount;
private static final int nameCacheRebuildIntervalSeconds;
- private static final boolean isAwsS3AtlasModelVersionV2;
+ private static final String awsS3AtlasModelVersion;
private static final boolean
skipHiveColumnLineageHive20633;
private static final int
skipHiveColumnLineageHive20633InputsThreshold;
@@ -101,7 +101,7 @@ public class HiveHook extends AtlasHook implements
ExecuteWithHookContext {
nameCacheDatabaseMaxCount =
atlasProperties.getInt(HOOK_NAME_CACHE_DATABASE_COUNT, 10000);
nameCacheTableMaxCount =
atlasProperties.getInt(HOOK_NAME_CACHE_TABLE_COUNT, 10000);
nameCacheRebuildIntervalSeconds =
atlasProperties.getInt(HOOK_NAME_CACHE_REBUID_INTERVAL_SEC, 60 * 60); // 60
minutes default
- isAwsS3AtlasModelVersionV2 =
StringUtils.equalsIgnoreCase(atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION,
HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2), HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
+ awsS3AtlasModelVersion =
atlasProperties.getString(HOOK_AWS_S3_ATLAS_MODEL_VERSION,
HOOK_AWS_S3_ATLAS_MODEL_VERSION_V2);
skipHiveColumnLineageHive20633 =
atlasProperties.getBoolean(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
skipHiveColumnLineageHive20633InputsThreshold =
atlasProperties.getInt(HOOK_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
15); // skip if avg # of inputs is > 15
hiveProcessPopulateDeprecatedAttributes =
atlasProperties.getBoolean(HOOK_HIVE_PROCESS_POPULATE_DEPRECATED_ATTRIBUTES,
false);
@@ -257,7 +257,9 @@ public class HiveHook extends AtlasHook implements
ExecuteWithHookContext {
return convertHdfsPathToLowerCase;
}
- public boolean isAwsS3AtlasModelVersionV2() { return
isAwsS3AtlasModelVersionV2; }
+ public String getAwsS3AtlasModelVersion() {
+ return awsS3AtlasModelVersion;
+ }
public boolean getSkipHiveColumnLineageHive20633() {
return skipHiveColumnLineageHive20633;
diff --git
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
index 6986ba1..2b809a6 100644
---
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
+++
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java
@@ -20,6 +20,7 @@ package org.apache.atlas.hive.hook.events;
import org.apache.atlas.hive.hook.AtlasHiveHookContext;
import org.apache.atlas.hive.hook.HiveHook.PreprocessAction;
+import org.apache.atlas.utils.PathExtractorContext;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
@@ -29,6 +30,7 @@ import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AtlasPathExtractorUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
@@ -73,25 +75,8 @@ public abstract class BaseHiveEvent {
public static final String HIVE_TYPE_PROCESS_EXECUTION =
"hive_process_execution";
public static final String HIVE_DB_DDL =
"hive_db_ddl";
public static final String HIVE_TABLE_DDL =
"hive_table_ddl";
- public static final String HDFS_TYPE_PATH =
"hdfs_path";
public static final String HBASE_TYPE_TABLE =
"hbase_table";
public static final String HBASE_TYPE_NAMESPACE =
"hbase_namespace";
- public static final String AWS_S3_BUCKET =
"aws_s3_bucket";
- public static final String AWS_S3_PSEUDO_DIR =
"aws_s3_pseudo_dir";
- public static final String AWS_S3_OBJECT =
"aws_s3_object";
- public static final String AWS_S3_V2_BUCKET =
"aws_s3_v2_bucket";
- public static final String AWS_S3_V2_PSEUDO_DIR =
"aws_s3_v2_directory";
- public static final String ADLS_GEN2_ACCOUNT =
"adls_gen2_account";
- public static final String ADLS_GEN2_CONTAINER =
"adls_gen2_container";
- public static final String ADLS_GEN2_DIRECTORY =
"adls_gen2_directory";
- public static final String ADLS_GEN2_ACCOUNT_HOST_SUFFIX =
".dfs.core.windows.net";
-
- public static final String SCHEME_SEPARATOR = "://";
- public static final String S3_SCHEME = "s3" +
SCHEME_SEPARATOR;
- public static final String S3A_SCHEME = "s3a" +
SCHEME_SEPARATOR;
- public static final String ABFS_SCHEME = "abfs" +
SCHEME_SEPARATOR;
- public static final String ABFSS_SCHEME = "abfss" +
SCHEME_SEPARATOR;
-
public static final String ATTRIBUTE_QUALIFIED_NAME =
"qualifiedName";
public static final String ATTRIBUTE_NAME = "name";
public static final String ATTRIBUTE_DESCRIPTION =
"description";
@@ -145,16 +130,10 @@ public abstract class BaseHiveEvent {
public static final String ATTRIBUTE_URI = "uri";
public static final String ATTRIBUTE_STORAGE_HANDLER =
"storage_handler";
public static final String ATTRIBUTE_NAMESPACE =
"namespace";
- public static final String ATTRIBUTE_OBJECT_PREFIX =
"objectPrefix";
- public static final String ATTRIBUTE_BUCKET = "bucket";
- public static final String ATTRIBUTE_CONTAINER =
"container";
public static final String ATTRIBUTE_HOSTNAME =
"hostName";
public static final String ATTRIBUTE_EXEC_TIME =
"execTime";
public static final String ATTRIBUTE_DDL_QUERIES =
"ddlQueries";
public static final String ATTRIBUTE_SERVICE_TYPE =
"serviceType";
- public static final String ATTRIBUTE_ACCOUNT = "account";
- public static final String ATTRIBUTE_PARENT = "parent";
-
public static final String HBASE_STORAGE_HANDLER_CLASS =
"org.apache.hadoop.hive.hbase.HBaseStorageHandler";
public static final String HBASE_DEFAULT_NAMESPACE = "default";
public static final String HBASE_NAMESPACE_TABLE_DELIMITER = ":";
@@ -170,14 +149,10 @@ public abstract class BaseHiveEvent {
public static final String RELATIONSHIP_HIVE_TABLE_PART_KEYS =
"hive_table_partitionkeys";
public static final String RELATIONSHIP_HIVE_TABLE_COLUMNS =
"hive_table_columns";
public static final String RELATIONSHIP_HIVE_TABLE_STORAGE_DESC =
"hive_table_storagedesc";
- public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS =
"aws_s3_bucket_aws_s3_pseudo_dirs";
- public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED =
"aws_s3_v2_container_contained";
public static final String RELATIONSHIP_HIVE_PROCESS_PROCESS_EXE =
"hive_process_process_executions";
public static final String RELATIONSHIP_HIVE_DB_DDL_QUERIES =
"hive_db_ddl_queries";
public static final String RELATIONSHIP_HIVE_TABLE_DDL_QUERIES =
"hive_table_ddl_queries";
public static final String RELATIONSHIP_HBASE_TABLE_NAMESPACE =
"hbase_table_namespace";
- public static final String RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS =
"adls_gen2_account_containers";
- public static final String RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN =
"adls_gen2_parent_children";
public static final Map<Integer, String> OWNER_TYPE_TO_ENUM_VALUE = new
HashMap<>();
@@ -584,52 +559,21 @@ public abstract class BaseHiveEvent {
}
protected AtlasEntity getPathEntity(Path path, AtlasEntityExtInfo extInfo)
{
- AtlasEntity ret;
- String strPath = path.toString();
- String metadataNamespace = getMetadataNamespace();
-
- if (strPath.startsWith(HDFS_PATH_PREFIX) &&
context.isConvertHdfsPathToLowerCase()) {
- strPath = strPath.toLowerCase();
- }
-
- if (isS3Path(strPath)) {
- if (context.isAwsS3AtlasModelVersionV2()) {
- ret = addS3PathEntityV2(path, strPath, extInfo);
- } else {
- ret = addS3PathEntityV1(path, strPath, extInfo);
- }
- } else if (isAbfsPath(strPath)) {
- ret = addAbfsPathEntity(path, strPath, extInfo);
- } else {
- String nameServiceID =
HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
- String attrPath = StringUtils.isEmpty(nameServiceID) ?
strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
- String pathQualifiedName = getQualifiedName(attrPath);
-
- ret = context.getEntity(pathQualifiedName);
-
- if (ret == null) {
- ret = new AtlasEntity(HDFS_TYPE_PATH);
-
- if (StringUtils.isNotEmpty(nameServiceID)) {
- ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
- }
-
- String name =
Path.getPathWithoutSchemeAndAuthority(path).toString();
+ String strPath = path.toString();
+ String metadataNamespace = getMetadataNamespace();
+ boolean isConvertPathToLowerCase =
strPath.startsWith(HDFS_PATH_PREFIX) && context.isConvertHdfsPathToLowerCase();
+ PathExtractorContext pathExtractorContext = new
PathExtractorContext(metadataNamespace, context.getQNameToEntityMap(),
+
isConvertPathToLowerCase, context.getAwsS3AtlasModelVersion());
- if (strPath.startsWith(HDFS_PATH_PREFIX) &&
context.isConvertHdfsPathToLowerCase()) {
- name = name.toLowerCase();
- }
-
- ret.setAttribute(ATTRIBUTE_PATH, attrPath);
- ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
- ret.setAttribute(ATTRIBUTE_NAME, name);
- ret.setAttribute(ATTRIBUTE_CLUSTER_NAME, metadataNamespace);
+ AtlasEntityWithExtInfo entityWithExtInfo =
AtlasPathExtractorUtil.getPathEntity(path, pathExtractorContext);
- context.putEntity(pathQualifiedName, ret);
+ if (entityWithExtInfo.getReferredEntities() != null){
+ for (AtlasEntity entity :
entityWithExtInfo.getReferredEntities().values()) {
+ extInfo.addReferredEntity(entity);
}
}
- return ret;
+ return entityWithExtInfo.getEntity();
}
protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs,
List<AtlasEntity> outputs) throws Exception {
@@ -1145,235 +1089,6 @@ public abstract class BaseHiveEvent {
return false;
}
- private boolean isS3Path(String strPath) {
- return strPath != null && (strPath.startsWith(S3_SCHEME) ||
strPath.startsWith(S3A_SCHEME));
- }
-
- private boolean isAbfsPath(String strPath) {
- return strPath != null && (strPath.startsWith(ABFS_SCHEME) ||
strPath.startsWith(ABFSS_SCHEME));
- }
-
- private AtlasEntity addS3PathEntityV1(Path path, String strPath,
AtlasEntityExtInfo extInfo) {
- String metadataNamespace = getMetadataNamespace();
- String bucketName = path.toUri().getAuthority();
- String bucketQualifiedName = (path.toUri().getScheme() +
SCHEME_SEPARATOR + path.toUri().getAuthority() +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
- String pathQualifiedName = (strPath +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
- AtlasEntity bucketEntity =
context.getEntity(bucketQualifiedName);
- AtlasEntity ret = context.getEntity(pathQualifiedName);
-
- if (ret == null) {
- if (bucketEntity == null) {
- bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
-
- bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
bucketQualifiedName);
- bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
-
- context.putEntity(bucketQualifiedName, bucketEntity);
- }
-
- extInfo.addReferredEntity(bucketEntity);
-
- ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
-
- ret.setRelationshipAttribute(ATTRIBUTE_BUCKET,
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity,
RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
- ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
- ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
- ret.setAttribute(ATTRIBUTE_NAME,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
-
- context.putEntity(pathQualifiedName, ret);
- }
-
- return ret;
- }
-
- private AtlasEntity addS3PathEntityV2(Path path, String strPath,
AtlasEntityExtInfo extInfo) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> addS3PathEntityV2(strPath={})", strPath);
- }
-
- String metadataNamespace = getMetadataNamespace();
- String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE
+ metadataNamespace;
- AtlasEntity ret = context.getEntity(pathQualifiedName);
-
- if (ret == null) {
- String bucketName = path.toUri().getAuthority();
- String schemeAndBucketName = (path.toUri().getScheme() +
SCHEME_SEPARATOR + bucketName).toLowerCase();
- String bucketQualifiedName = schemeAndBucketName +
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
- AtlasEntity bucketEntity =
context.getEntity(bucketQualifiedName);
-
- if (bucketEntity == null) {
- bucketEntity = new AtlasEntity(AWS_S3_V2_BUCKET);
-
- bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
bucketQualifiedName);
- bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("adding entity: typeName={}, qualifiedName={}",
bucketEntity.getTypeName(),
bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
- }
-
- context.putEntity(bucketQualifiedName, bucketEntity);
- }
-
- extInfo.addReferredEntity(bucketEntity);
-
- AtlasRelatedObjectId parentObjId =
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity,
RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
- String parentPath = Path.SEPARATOR;
- String dirPath = path.toUri().getPath();
-
- if (StringUtils.isEmpty(dirPath)) {
- dirPath = Path.SEPARATOR;
- }
-
- for (String subDirName : dirPath.split(Path.SEPARATOR)) {
- if (StringUtils.isEmpty(subDirName)) {
- continue;
- }
-
- String subDirPath = parentPath + subDirName +
Path.SEPARATOR;
- String subDirQualifiedName = schemeAndBucketName + subDirPath
+ QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
-
- ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
-
- ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
- ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
- ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
subDirQualifiedName);
- ret.setAttribute(ATTRIBUTE_NAME, subDirName);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("adding entity: typeName={}, qualifiedName={}",
ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
- }
-
- context.putEntity(subDirQualifiedName, ret);
-
- parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret,
RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
- parentPath = subDirPath;
- }
-
- if (ret == null) {
- ret = bucketEntity;
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== addS3PathEntityV2(strPath={})", strPath);
- }
-
- return ret;
- }
-
- private AtlasEntity addAbfsPathEntity(Path path, String strPath,
AtlasEntityExtInfo extInfo) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> addAbfsPathEntity(strPath={})", strPath);
- }
-
- String metadataNamespace = getMetadataNamespace();
- String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE
+ metadataNamespace;
- AtlasEntity ret = context.getEntity(pathQualifiedName);
-
- if (ret == null) {
- String abfsScheme = path.toUri().getScheme();
- String storageAcctName =
getAbfsStorageAccountName(path.toUri());
- String schemeAndStorageAcctName = (abfsScheme +
SCHEME_SEPARATOR + storageAcctName).toLowerCase();
- String storageAcctQualifiedName = schemeAndStorageAcctName +
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
- AtlasEntity storageAcctEntity =
context.getEntity(storageAcctQualifiedName);
-
- // create adls-gen2 storage-account entity
- if (storageAcctEntity == null) {
- storageAcctEntity = new AtlasEntity(ADLS_GEN2_ACCOUNT);
-
- storageAcctEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
storageAcctQualifiedName);
- storageAcctEntity.setAttribute(ATTRIBUTE_NAME,
storageAcctName);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("adding entity: typeName={}, qualifiedName={}",
storageAcctEntity.getTypeName(),
storageAcctEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
- }
-
- context.putEntity(storageAcctQualifiedName, storageAcctEntity);
- }
-
- extInfo.addReferredEntity(storageAcctEntity);
-
- AtlasRelatedObjectId storageAcctObjId =
AtlasTypeUtil.getAtlasRelatedObjectId(storageAcctEntity,
RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS);
-
- // create adls-gen2 container entity linking to storage account
- String containerName = path.toUri().getUserInfo();
- String schemeAndContainerName = (abfsScheme +
SCHEME_SEPARATOR + containerName + QNAME_SEP_METADATA_NAMESPACE +
storageAcctName).toLowerCase();
- String containerQualifiedName = schemeAndContainerName +
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
- AtlasEntity containerEntity =
context.getEntity(containerQualifiedName);
-
- if (containerEntity == null) {
- containerEntity = new AtlasEntity(ADLS_GEN2_CONTAINER);
-
- containerEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
containerQualifiedName);
- containerEntity.setAttribute(ATTRIBUTE_NAME, containerName);
- containerEntity.setRelationshipAttribute(ATTRIBUTE_ACCOUNT,
storageAcctObjId);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("adding entity: typeName={}, qualifiedName={}",
containerEntity.getTypeName(),
containerEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
- }
-
- context.putEntity(containerQualifiedName, containerEntity);
- }
-
- extInfo.addReferredEntity(containerEntity);
-
- // create adls-gen2 directory entity linking to container
- AtlasRelatedObjectId parentObjId =
AtlasTypeUtil.getAtlasRelatedObjectId(containerEntity,
RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
- String parentPath = Path.SEPARATOR;
- String dirPath = path.toUri().getPath();
-
- if (StringUtils.isEmpty(dirPath)) {
- dirPath = Path.SEPARATOR;
- }
-
- for (String subDirName : dirPath.split(Path.SEPARATOR)) {
- if (StringUtils.isEmpty(subDirName)) {
- continue;
- }
-
- String subDirPath = parentPath + subDirName +
Path.SEPARATOR;
- String subDirQualifiedName = schemeAndContainerName +
subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
-
- ret = new AtlasEntity(ADLS_GEN2_DIRECTORY);
-
- ret.setRelationshipAttribute(ATTRIBUTE_PARENT, parentObjId);
- ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
subDirQualifiedName);
- ret.setAttribute(ATTRIBUTE_NAME, subDirName);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("adding entity: typeName={}, qualifiedName={}",
ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
- }
-
- context.putEntity(subDirQualifiedName, ret);
-
- parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret,
RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
- parentPath = subDirPath;
- }
-
- if (ret == null) {
- ret = storageAcctEntity;
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("<== addAbfsPathEntity(strPath={})", strPath);
- }
-
- return ret;
- }
-
- private String getAbfsStorageAccountName(URI uri) {
- String ret = null;
- String host = uri.getHost();
-
- // host: "<account_name>.dfs.core.windows.net"
- if (StringUtils.isNotEmpty(host) &&
host.contains(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)) {
- ret = host.substring(0,
host.indexOf(ADLS_GEN2_ACCOUNT_HOST_SUFFIX));
- }
-
- return ret;
- }
-
static final class EntityComparator implements Comparator<Entity> {
@Override
public int compare(Entity entity1, Entity entity2) {
diff --git
a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
new file mode 100644
index 0000000..c3276a8
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.utils;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityExtInfo;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+public class AtlasPathExtractorUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(AtlasPathExtractorUtil.class);
+
+ // Common
+ public static final char QNAME_SEP_METADATA_NAMESPACE = '@';
+ public static final char QNAME_SEP_ENTITY_NAME = '.';
+ public static final String SCHEME_SEPARATOR = "://";
+ public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+ public static final String ATTRIBUTE_NAME = "name";
+ public static final String ATTRIBUTE_BUCKET = "bucket";
+
+ // HDFS
+ public static final String HDFS_TYPE_PATH = "hdfs_path";
+ public static final String ATTRIBUTE_PATH = "path";
+ public static final String ATTRIBUTE_CLUSTER_NAME = "clusterName";
+ public static final String ATTRIBUTE_NAMESERVICE_ID = "nameServiceId";
+
+ // AWS S3
+ public static final String AWS_S3_ATLAS_MODEL_VERSION_V2 =
"v2";
+ public static final String AWS_S3_BUCKET =
"aws_s3_bucket";
+ public static final String AWS_S3_PSEUDO_DIR =
"aws_s3_pseudo_dir";
+ public static final String AWS_S3_V2_BUCKET =
"aws_s3_v2_bucket";
+ public static final String AWS_S3_V2_PSEUDO_DIR =
"aws_s3_v2_directory";
+ public static final String S3_SCHEME =
"s3" + SCHEME_SEPARATOR;
+ public static final String S3A_SCHEME =
"s3a" + SCHEME_SEPARATOR;
+ public static final String ATTRIBUTE_CONTAINER =
"container";
+ public static final String ATTRIBUTE_OBJECT_PREFIX =
"objectPrefix";
+ public static final String RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS =
"aws_s3_bucket_aws_s3_pseudo_dirs";
+ public static final String RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED =
"aws_s3_v2_container_contained";
+
+ // ADLS Gen2
+ public static final String ADLS_GEN2_ACCOUNT =
"adls_gen2_account";
+ public static final String ADLS_GEN2_CONTAINER =
"adls_gen2_container";
+ public static final String ADLS_GEN2_DIRECTORY =
"adls_gen2_directory";
+ public static final String ADLS_GEN2_ACCOUNT_HOST_SUFFIX =
".dfs.core.windows.net";
+ public static final String ABFS_SCHEME =
"abfs" + SCHEME_SEPARATOR;
+ public static final String ABFSS_SCHEME =
"abfss" + SCHEME_SEPARATOR;
+ public static final String ATTRIBUTE_ACCOUNT =
"account";
+ public static final String ATTRIBUTE_PARENT =
"parent";
+ public static final String RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS =
"adls_gen2_account_containers";
+ public static final String RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN =
"adls_gen2_parent_children";
+
+ // Ozone
+ public static final String OZONE_VOLUME =
"ozone_volume";
+ public static final String OZONE_BUCKET =
"ozone_bucket";
+ public static final String OZONE_KEY = "ozone_key";
+ public static final String OZONE_SCHEME = "ofs" +
SCHEME_SEPARATOR;
+ public static final String OZONE_3_SCHEME = "o3fs" +
SCHEME_SEPARATOR;
+ public static final String ATTRIBUTE_VOLUME = "volume";
+ public static final String RELATIONSHIP_OZONE_VOLUME_BUCKET =
"ozone_volume_buckets";
+ public static final String RELATIONSHIP_OZONE_BUCKET_KEY =
"ozone_bucket_keys";
+
+ public static AtlasEntityWithExtInfo getPathEntity(Path path,
PathExtractorContext context) {
+ AtlasEntityWithExtInfo entityWithExtInfo = new
AtlasEntityWithExtInfo();
+ AtlasEntity ret;
+ String strPath = path.toString();
+
+ if (context.isConvertPathToLowerCase()) {
+ strPath = strPath.toLowerCase();
+ }
+
+ if (isS3Path(strPath)) {
+ ret = isAwsS3AtlasModelVersionV2(context) ?
addS3PathEntityV2(path, entityWithExtInfo, context) :
+
addS3PathEntityV1(path, entityWithExtInfo, context);
+ } else if (isAbfsPath(strPath)) {
+ ret = addAbfsPathEntity(path, entityWithExtInfo, context);
+ } else if (isOzonePath(strPath)) {
+ ret = addOzonePathEntity(path, entityWithExtInfo, context);
+ } else {
+ ret = addHDFSPathEntity(path, context);
+ }
+ entityWithExtInfo.setEntity(ret);
+
+ return entityWithExtInfo;
+ }
+
+ private static boolean isAwsS3AtlasModelVersionV2(PathExtractorContext
context) {
+ return StringUtils.isNotEmpty(context.getAwsS3AtlasModelVersion()) &&
+
StringUtils.equalsIgnoreCase(context.getAwsS3AtlasModelVersion(),
AWS_S3_ATLAS_MODEL_VERSION_V2);
+ }
+
+ private static boolean isS3Path(String strPath) {
+ return strPath != null && (strPath.startsWith(S3_SCHEME) ||
strPath.startsWith(S3A_SCHEME));
+ }
+
+ private static boolean isAbfsPath(String strPath) {
+ return strPath != null && (strPath.startsWith(ABFS_SCHEME) ||
strPath.startsWith(ABFSS_SCHEME));
+ }
+
+ private static boolean isOzonePath(String strPath) {
+ return strPath != null && (strPath.startsWith(OZONE_SCHEME) ||
strPath.startsWith(OZONE_3_SCHEME));
+ }
+
+ private static AtlasEntity addS3PathEntityV1(Path path, AtlasEntityExtInfo
extInfo, PathExtractorContext context) {
+ String strPath = path.toString();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> addS3PathEntityV1(strPath={})", strPath);
+ }
+
+ String metadataNamespace = context.getMetadataNamespace();
+ String bucketName = path.toUri().getAuthority();
+ String bucketQualifiedName = (path.toUri().getScheme() +
SCHEME_SEPARATOR + path.toUri().getAuthority() +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
+ String pathQualifiedName = (strPath +
QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + metadataNamespace;
+ AtlasEntity bucketEntity =
context.getEntity(bucketQualifiedName);
+ AtlasEntity ret = context.getEntity(pathQualifiedName);
+
+ if (ret == null) {
+ if (bucketEntity == null) {
+ bucketEntity = new AtlasEntity(AWS_S3_BUCKET);
+
+ bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
bucketQualifiedName);
+ bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+
+ context.putEntity(bucketQualifiedName, bucketEntity);
+ }
+
+ extInfo.addReferredEntity(bucketEntity);
+
+ ret = new AtlasEntity(AWS_S3_PSEUDO_DIR);
+
+ ret.setRelationshipAttribute(ATTRIBUTE_BUCKET,
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity,
RELATIONSHIP_AWS_S3_BUCKET_S3_PSEUDO_DIRS));
+ ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME,
Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
+
+ context.putEntity(pathQualifiedName, ret);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== addS3PathEntityV1(strPath={})", strPath);
+ }
+
+ return ret;
+ }
+
+ private static AtlasEntity addS3PathEntityV2(Path path, AtlasEntityExtInfo
extInfo, PathExtractorContext context) {
+ String strPath = path.toString();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> addS3PathEntityV2(strPath={})", strPath);
+ }
+
+ String metadataNamespace = context.getMetadataNamespace();
+ String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE
+ metadataNamespace;
+ AtlasEntity ret = context.getEntity(pathQualifiedName);
+
+ if (ret == null) {
+ String bucketName = path.toUri().getAuthority();
+ String schemeAndBucketName = (path.toUri().getScheme() +
SCHEME_SEPARATOR + bucketName).toLowerCase();
+ String bucketQualifiedName = schemeAndBucketName +
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+ AtlasEntity bucketEntity =
context.getEntity(bucketQualifiedName);
+
+ if (bucketEntity == null) {
+ bucketEntity = new AtlasEntity(AWS_S3_V2_BUCKET);
+
+ bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
bucketQualifiedName);
+ bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
bucketEntity.getTypeName(),
bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(bucketQualifiedName, bucketEntity);
+ }
+
+ extInfo.addReferredEntity(bucketEntity);
+
+ AtlasRelatedObjectId parentObjId =
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity,
RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
+ String parentPath = Path.SEPARATOR;
+ String dirPath = path.toUri().getPath();
+
+ if (StringUtils.isEmpty(dirPath)) {
+ dirPath = Path.SEPARATOR;
+ }
+
+ for (String subDirName : dirPath.split(Path.SEPARATOR)) {
+ if (StringUtils.isEmpty(subDirName)) {
+ continue;
+ }
+
+ String subDirPath = parentPath + subDirName +
Path.SEPARATOR;
+ String subDirQualifiedName = schemeAndBucketName + subDirPath
+ QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+
+ ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR);
+
+ ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId);
+ ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath);
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
subDirQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, subDirName);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(subDirQualifiedName, ret);
+
+ parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret,
RELATIONSHIP_AWS_S3_V2_CONTAINER_CONTAINED);
+ parentPath = subDirPath;
+ }
+
+ if (ret == null) {
+ ret = bucketEntity;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== addS3PathEntityV2(strPath={})", strPath);
+ }
+
+ return ret;
+ }
+
+ private static AtlasEntity addAbfsPathEntity(Path path, AtlasEntityExtInfo
extInfo, PathExtractorContext context) {
+ String strPath = path.toString();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> addAbfsPathEntity(strPath={})", strPath);
+ }
+
+ String metadataNamespace = context.getMetadataNamespace();
+ String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE
+ metadataNamespace;
+ AtlasEntity ret = context.getEntity(pathQualifiedName);
+
+ if (ret == null) {
+ String abfsScheme = path.toUri().getScheme();
+ String storageAcctName =
getAbfsStorageAccountName(path.toUri());
+ String schemeAndStorageAcctName = (abfsScheme +
SCHEME_SEPARATOR + storageAcctName).toLowerCase();
+ String storageAcctQualifiedName = schemeAndStorageAcctName +
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+ AtlasEntity storageAcctEntity =
context.getEntity(storageAcctQualifiedName);
+
+ // create adls-gen2 storage-account entity
+ if (storageAcctEntity == null) {
+ storageAcctEntity = new AtlasEntity(ADLS_GEN2_ACCOUNT);
+
+ storageAcctEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
storageAcctQualifiedName);
+ storageAcctEntity.setAttribute(ATTRIBUTE_NAME,
storageAcctName);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
storageAcctEntity.getTypeName(),
storageAcctEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(storageAcctQualifiedName, storageAcctEntity);
+ }
+
+ extInfo.addReferredEntity(storageAcctEntity);
+
+ AtlasRelatedObjectId storageAcctObjId =
AtlasTypeUtil.getAtlasRelatedObjectId(storageAcctEntity,
RELATIONSHIP_ADLS_GEN2_ACCOUNT_CONTAINERS);
+
+ // create adls-gen2 container entity linking to storage account
+ String containerName = path.toUri().getUserInfo();
+ String schemeAndContainerName = (abfsScheme +
SCHEME_SEPARATOR + containerName + QNAME_SEP_METADATA_NAMESPACE +
storageAcctName).toLowerCase();
+ String containerQualifiedName = schemeAndContainerName +
QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+ AtlasEntity containerEntity =
context.getEntity(containerQualifiedName);
+
+ if (containerEntity == null) {
+ containerEntity = new AtlasEntity(ADLS_GEN2_CONTAINER);
+
+ containerEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
containerQualifiedName);
+ containerEntity.setAttribute(ATTRIBUTE_NAME, containerName);
+ containerEntity.setRelationshipAttribute(ATTRIBUTE_ACCOUNT,
storageAcctObjId);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
containerEntity.getTypeName(),
containerEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(containerQualifiedName, containerEntity);
+ }
+
+ extInfo.addReferredEntity(containerEntity);
+
+ // create adls-gen2 directory entity linking to container
+ AtlasRelatedObjectId parentObjId =
AtlasTypeUtil.getAtlasRelatedObjectId(containerEntity,
RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
+ String parentPath = Path.SEPARATOR;
+ String dirPath = path.toUri().getPath();
+
+ if (StringUtils.isEmpty(dirPath)) {
+ dirPath = Path.SEPARATOR;
+ }
+
+ for (String subDirName : dirPath.split(Path.SEPARATOR)) {
+ if (StringUtils.isEmpty(subDirName)) {
+ continue;
+ }
+
+ String subDirPath = parentPath + subDirName +
Path.SEPARATOR;
+ String subDirQualifiedName = schemeAndContainerName +
subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+
+ ret = new AtlasEntity(ADLS_GEN2_DIRECTORY);
+
+ ret.setRelationshipAttribute(ATTRIBUTE_PARENT, parentObjId);
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
subDirQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, subDirName);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(subDirQualifiedName, ret);
+
+ parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret,
RELATIONSHIP_ADLS_GEN2_PARENT_CHILDREN);
+ parentPath = subDirPath;
+ }
+
+ if (ret == null) {
+ ret = storageAcctEntity;
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== addAbfsPathEntity(strPath={})", strPath);
+ }
+
+ return ret;
+ }
+
+ private static AtlasEntity addOzonePathEntity(Path path,
AtlasEntityExtInfo extInfo, PathExtractorContext context) {
+ String strPath = path.toString();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> addOzonePathEntity(strPath={})", strPath);
+ }
+
+ String metadataNamespace = context.getMetadataNamespace();
+ String ozoneScheme = path.toUri().getScheme();
+ String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE
+ metadataNamespace;
+ AtlasEntity ret = context.getEntity(pathQualifiedName);
+
+ if (ret == null) {
+ //create ozone volume entity
+ String volumeName = getOzoneVolumeName(path);
+ String volumeQualifiedName = ozoneScheme + SCHEME_SEPARATOR +
volumeName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+ AtlasEntity volumeEntity =
context.getEntity(volumeQualifiedName);
+
+ if (volumeEntity == null) {
+ volumeEntity = new AtlasEntity(OZONE_VOLUME);
+
+ volumeEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
volumeQualifiedName);
+ volumeEntity.setAttribute(ATTRIBUTE_NAME, volumeName);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
volumeEntity.getTypeName(),
volumeEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(volumeQualifiedName, volumeEntity);
+ }
+
+ extInfo.addReferredEntity(volumeEntity);
+
+ //create ozone bucket entity
+ String bucketName = getOzoneBucketName(path);
+ String bucketQualifiedName = ozoneScheme + SCHEME_SEPARATOR +
volumeName + QNAME_SEP_ENTITY_NAME + bucketName + QNAME_SEP_METADATA_NAMESPACE
+ metadataNamespace;
+ AtlasEntity bucketEntity =
context.getEntity(bucketQualifiedName);
+
+ if (bucketEntity == null) {
+ bucketEntity = new AtlasEntity(OZONE_BUCKET);
+
+ bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
bucketQualifiedName);
+ bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName);
+ bucketEntity.setRelationshipAttribute( ATTRIBUTE_VOLUME,
AtlasTypeUtil.getAtlasRelatedObjectId(volumeEntity,
RELATIONSHIP_OZONE_VOLUME_BUCKET));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding entity: typeName={}, qualifiedName={}",
bucketEntity.getTypeName(),
bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ }
+
+ context.putEntity(bucketQualifiedName, bucketEntity);
+ }
+
+ extInfo.addReferredEntity(bucketEntity);
+
+ ret = new AtlasEntity(OZONE_KEY);
+
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, path.toUri().getPath());
+ ret.setRelationshipAttribute( ATTRIBUTE_BUCKET,
AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity,
RELATIONSHIP_OZONE_BUCKET_KEY));
+
+ context.putEntity(pathQualifiedName, ret);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== addOzonePathEntity(strPath={})", strPath);
+ }
+
+ return ret;
+ }
+
+ private static AtlasEntity addHDFSPathEntity(Path path,
PathExtractorContext context) {
+ String strPath = path.toString();
+
+ if (context.isConvertPathToLowerCase()) {
+ strPath = strPath.toLowerCase();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> addHDFSPathEntity(strPath={})", strPath);
+ }
+
+ String nameServiceID =
HdfsNameServiceResolver.getNameServiceIDForPath(strPath);
+ String attrPath = StringUtils.isEmpty(nameServiceID) ?
strPath : HdfsNameServiceResolver.getPathWithNameServiceID(strPath);
+ String pathQualifiedName = getQualifiedName(attrPath,
context.getMetadataNamespace());
+ AtlasEntity ret = context.getEntity(pathQualifiedName);
+
+ if (ret == null) {
+ ret = new AtlasEntity(HDFS_TYPE_PATH);
+
+ if (StringUtils.isNotEmpty(nameServiceID)) {
+ ret.setAttribute(ATTRIBUTE_NAMESERVICE_ID, nameServiceID);
+ }
+
+ String name =
Path.getPathWithoutSchemeAndAuthority(path).toString();
+
+ if (context.isConvertPathToLowerCase()) {
+ name = name.toLowerCase();
+ }
+
+ ret.setAttribute(ATTRIBUTE_PATH, attrPath);
+ ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, pathQualifiedName);
+ ret.setAttribute(ATTRIBUTE_NAME, name);
+ ret.setAttribute(ATTRIBUTE_CLUSTER_NAME,
context.getMetadataNamespace());
+
+ context.putEntity(pathQualifiedName, ret);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<== addHDFSPathEntity(strPath={})", strPath);
+ }
+
+ return ret;
+ }
+
+ private static String getAbfsStorageAccountName(URI uri) {
+ String ret = null;
+ String host = uri.getHost();
+
+ // host: "<account_name>.dfs.core.windows.net"
+ if (StringUtils.isNotEmpty(host) &&
host.contains(ADLS_GEN2_ACCOUNT_HOST_SUFFIX)) {
+ ret = host.substring(0,
host.indexOf(ADLS_GEN2_ACCOUNT_HOST_SUFFIX));
+ }
+
+ return ret;
+ }
+
+ private static String getOzoneVolumeName(Path path) {
+ String pathAuthority = path.toUri().getAuthority();
+ // pathAuthority: "<bucket_name>.<volume_name>.<ozone.service.id>"
+ return pathAuthority.split("\\.")[1];
+ }
+
+ private static String getOzoneBucketName(Path path) {
+ String pathAuthority = path.toUri().getAuthority();
+ return pathAuthority.split("\\.")[0];
+ }
+
+ private static String getQualifiedName(String path, String
metadataNamespace) {
+ if (path.startsWith(HdfsNameServiceResolver.HDFS_SCHEME)) {
+ return path + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace;
+ }
+
+ return path.toLowerCase();
+ }
+}
\ No newline at end of file
diff --git
a/common/src/main/java/org/apache/atlas/utils/PathExtractorContext.java
b/common/src/main/java/org/apache/atlas/utils/PathExtractorContext.java
new file mode 100644
index 0000000..ce688e4
--- /dev/null
+++ b/common/src/main/java/org/apache/atlas/utils/PathExtractorContext.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.utils;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class PathExtractorContext {
+ private final String metadataNamespace;
+ private final Map<String, AtlasEntity> knownEntities;
+ private final boolean isConvertPathToLowerCase;
+ private final String awsS3AtlasModelVersion;
+
+ public PathExtractorContext(String metadataNamespace) {
+ this(metadataNamespace, new HashMap<>(), false, null) ;
+ }
+
+ public PathExtractorContext(String metadataNamespace, String
awsS3AtlasModelVersion) {
+ this(metadataNamespace, new HashMap<>(), false,
awsS3AtlasModelVersion) ;
+ }
+
+ public PathExtractorContext(String metadataNamespace, boolean
isConvertPathToLowerCase, String awsS3AtlasModelVersion) {
+ this(metadataNamespace, new HashMap<>(), isConvertPathToLowerCase,
awsS3AtlasModelVersion) ;
+ }
+
+ public PathExtractorContext(String metadataNamespace, Map<String,
AtlasEntity> knownEntities, boolean isConvertPathToLowerCase, String
awsS3AtlasModelVersion) {
+ this.metadataNamespace = metadataNamespace;
+ this.knownEntities = knownEntities;
+ this.isConvertPathToLowerCase = isConvertPathToLowerCase;
+ this.awsS3AtlasModelVersion = awsS3AtlasModelVersion;
+ }
+
+ public String getMetadataNamespace() {
+ return metadataNamespace;
+ }
+
+ public Map<String, AtlasEntity> getKnownEntities() {
+ return knownEntities;
+ }
+
+ public void putEntity(String qualifiedName, AtlasEntity entity) {
+ knownEntities.put(qualifiedName, entity);
+ }
+
+ public AtlasEntity getEntity(String qualifiedName) {
+ return knownEntities.get(qualifiedName);
+ }
+
+ public boolean isConvertPathToLowerCase() {
+ return isConvertPathToLowerCase;
+ }
+
+ public String getAwsS3AtlasModelVersion() {
+ return awsS3AtlasModelVersion;
+ }
+}
\ No newline at end of file
diff --git
a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
new file mode 100644
index 0000000..664bfb7
--- /dev/null
+++
b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.utils;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+import org.apache.hadoop.fs.Path;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+public class AtlasPathExtractorUtilTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(AtlasPathExtractorUtilTest.class);
+
+ // Common
+ private static final String METADATA_NAMESPACE = "metaspace";
+ private static final String QNAME_METADATA_NAMESPACE = '@' +
METADATA_NAMESPACE;
+ private static final String SCHEME_SEPARATOR = "://";
+ private static final String ATTRIBUTE_NAME = "name";
+ private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+
+ // HDFS
+ private static final String HDFS_PATH_TYPE = "hdfs_path";
+ private static final String ATTRIBUTE_PATH = "path";
+ private static final String ATTRIBUTE_CLUSTER_NAME = "clusterName";
+
+ // Ozone
+ private static final String OZONE_VOLUME = "ozone_volume";
+ private static final String OZONE_BUCKET = "ozone_bucket";
+ private static final String OZONE_KEY = "ozone_key";
+ private static final String OZONE_SCHEME = "ofs" + SCHEME_SEPARATOR;
+ private static final String OZONE_3_SCHEME = "o3fs" + SCHEME_SEPARATOR;
+ private static final String OZONE_PATH = OZONE_SCHEME +
"bucket1.volume1.ozone1/files/file.txt";
+ private static final String OZONE_3_PATH = OZONE_3_SCHEME +
"bucket1.volume1.ozone1/files/file.txt";
+
+ // HDFS
+ private static final String HDFS_SCHEME = "hdfs" + SCHEME_SEPARATOR;
+ private static final String HDFS_PATH = HDFS_SCHEME +
"host_name:8020/warehouse/tablespace/external/hive/taBlE_306";
+
+ @Test
+ public void testGetPathEntityOzone3Path() {
+ PathExtractorContext extractorContext = new
PathExtractorContext(METADATA_NAMESPACE);
+
+ Path path = new Path(OZONE_3_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo =
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity = entityWithExtInfo.getEntity();
+
+ assertNotNull(entity);
+ assertEquals(entity.getTypeName(), OZONE_KEY);
+ verifyOzoneKeyEntity(OZONE_3_PATH, entity);
+
+ assertEquals(entityWithExtInfo.getReferredEntities().size(), 2);
+ verifyOzoneEntities(OZONE_3_SCHEME, OZONE_3_PATH,
extractorContext.getKnownEntities());
+
+ assertEquals(extractorContext.getKnownEntities().size(), 3);
+ verifyOzoneEntities(OZONE_3_SCHEME, OZONE_3_PATH,
extractorContext.getKnownEntities());
+ }
+
+ @Test
+ public void testGetPathEntityOzonePath() {
+ PathExtractorContext extractorContext = new
PathExtractorContext(METADATA_NAMESPACE);
+
+ Path path = new Path(OZONE_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo =
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity = entityWithExtInfo.getEntity();
+
+ assertNotNull(entity);
+ assertEquals(entity.getTypeName(), OZONE_KEY);
+ verifyOzoneKeyEntity(OZONE_PATH, entity);
+
+ assertEquals(entityWithExtInfo.getReferredEntities().size(), 2);
+ verifyOzoneEntities(OZONE_SCHEME, OZONE_PATH,
extractorContext.getKnownEntities());
+
+ assertEquals(extractorContext.getKnownEntities().size(), 3);
+ verifyOzoneEntities(OZONE_SCHEME, OZONE_PATH,
extractorContext.getKnownEntities());
+ }
+
+ @Test
+ public void testGetPathEntityHdfsPath() {
+ Map<String, AtlasEntity> knownEntities = new HashMap<>();
+ AtlasEntityWithExtInfo extInfo = new AtlasEntityWithExtInfo();
+
+ PathExtractorContext extractorContext = new
PathExtractorContext(METADATA_NAMESPACE);
+
+ Path path = new Path(HDFS_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo =
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity = entityWithExtInfo.getEntity();
+
+ assertNotNull(entity);
+ assertEquals(entity.getTypeName(), HDFS_PATH_TYPE);
+ verifyHDFSEntity(entity, false);
+
+ assertNull(extInfo.getReferredEntities());
+ assertEquals(extractorContext.getKnownEntities().size(), 1);
+ extractorContext.getKnownEntities().values().forEach(x ->
verifyHDFSEntity(x, false));
+ }
+
+ @Test
+ public void testGetPathEntityHdfsPathLowerCase() {
+ PathExtractorContext extractorContext = new
PathExtractorContext(METADATA_NAMESPACE, true, null);
+
+ Path path = new Path(HDFS_PATH);
+ AtlasEntityWithExtInfo entityWithExtInfo =
AtlasPathExtractorUtil.getPathEntity(path, extractorContext);
+ AtlasEntity entity = entityWithExtInfo.getEntity();
+
+ assertNotNull(entity);
+ assertEquals(entity.getTypeName(), HDFS_PATH_TYPE);
+ verifyHDFSEntity(entity, true);
+
+ assertNull(entityWithExtInfo.getReferredEntities());
+ assertEquals(extractorContext.getKnownEntities().size(), 1);
+ extractorContext.getKnownEntities().values().forEach(x ->
verifyHDFSEntity(x, true));
+ }
+
+ private void verifyOzoneEntities(String scheme, String path, Map<String,
AtlasEntity> knownEntities) {
+ for (AtlasEntity knownEntity : knownEntities.values()) {
+ switch (knownEntity.getTypeName()){
+ case OZONE_KEY:
+ verifyOzoneKeyEntity(path, knownEntity);
+ break;
+
+ case OZONE_VOLUME:
+
assertEquals(knownEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme +
"volume1" + QNAME_METADATA_NAMESPACE);
+ assertEquals(knownEntity.getAttribute(ATTRIBUTE_NAME),
"volume1");
+ break;
+
+ case OZONE_BUCKET:
+
assertEquals(knownEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme +
"volume1.bucket1" + QNAME_METADATA_NAMESPACE);
+ assertEquals(knownEntity.getAttribute(ATTRIBUTE_NAME),
"bucket1");
+ break;
+ }
+ }
+ }
+
+ private void verifyOzoneKeyEntity(String path, AtlasEntity entity) {
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), path +
QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "/files/file.txt");
+ }
+
+ private void verifyHDFSEntity(AtlasEntity entity, boolean toLowerCase) {
+ if (toLowerCase) {
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME),
HDFS_PATH.toLowerCase() + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME),
"/warehouse/tablespace/external/hive/table_306");
+ assertEquals(entity.getAttribute(ATTRIBUTE_PATH),
HDFS_PATH.toLowerCase());
+ assertEquals(entity.getAttribute(ATTRIBUTE_CLUSTER_NAME),
METADATA_NAMESPACE);
+ } else {
+ assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME),
HDFS_PATH + QNAME_METADATA_NAMESPACE);
+ assertEquals(entity.getAttribute(ATTRIBUTE_NAME),
"/warehouse/tablespace/external/hive/taBlE_306");
+ assertEquals(entity.getAttribute(ATTRIBUTE_PATH), HDFS_PATH);
+ assertEquals(entity.getAttribute(ATTRIBUTE_CLUSTER_NAME),
METADATA_NAMESPACE);
+ }
+ }
+}