This is an automated email from the ASF dual-hosted git repository. lide pushed a commit to branch branch-0.15 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/branch-0.15 by this push: new a90302b549 [Feature] add support for tencent chdfs (#8964) a90302b549 is described below commit a90302b549617d854f6d9792b465b0b31c7b723f Author: wucheng <wucheng.xid...@foxmail.com> AuthorDate: Tue Apr 12 15:37:31 2022 +0800 [Feature] add support for tencent chdfs (#8964) Co-authored-by: chengwu <chen...@tencent.com> --- .../java/org/apache/doris/analysis/ExportStmt.java | 5 +- .../org/apache/doris/analysis/StorageBackend.java | 3 + .../doris/broker/hdfs/FileSystemManager.java | 135 ++++++++++++++++++++- gensrc/thrift/Types.thrift | 1 + 4 files changed, 141 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index 275dc51bf1..095dbbd68a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -238,8 +238,9 @@ public class ExportStmt extends StatementBase { String schema = uri.getScheme(); if (type == StorageBackend.StorageType.BROKER) { if (schema == null || (!schema.equalsIgnoreCase("bos") && !schema.equalsIgnoreCase("afs") - && !schema.equalsIgnoreCase("hdfs"))) { - throw new AnalysisException("Invalid export path. please use valid 'HDFS://', 'AFS://' or 'BOS://' path."); + && !schema.equalsIgnoreCase("hdfs") && !schema.equalsIgnoreCase("ofs"))) { + throw new AnalysisException("Invalid export path. please use valid 'HDFS://', 'AFS://' , 'BOS://', " + + "or 'ofs://' path."); } } else if (type == StorageBackend.StorageType.S3) { if (schema == null || !schema.equalsIgnoreCase("s3")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java index 933153b728..70b10b4f06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java @@ -111,6 +111,7 @@ public class StorageBackend extends StorageDesc implements ParseNode { public enum StorageType { BROKER("Doris Broker"), S3("Amazon S3 Simple Storage Service"), + OFS("Tencent CHDFS"), // the following is not used currently HDFS("Hadoop Distributed File System"), LOCAL("Local file system"); @@ -132,6 +133,8 @@ public class StorageBackend extends StorageDesc implements ParseNode { return TStorageBackendType.S3; case HDFS: return TStorageBackendType.HDFS; + case OFS: + return TStorageBackendType.OFS; case LOCAL: return TStorageBackendType.LOCAL; default: diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index 6cedd70449..526c914c7e 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -66,6 +66,7 @@ public class FileSystemManager { // supported scheme private static final String HDFS_SCHEME = "hdfs"; private static final String S3A_SCHEME = "s3a"; + private static final String CHDFS_SCHEME = "ofs"; private static final String USER_NAME_KEY = "username"; private static final String PASSWORD_KEY = "password"; @@ -152,7 +153,9 @@ public class FileSystemManager { brokerFileSystem = getDistributedFileSystem(path, properties); } else if (scheme.equals(S3A_SCHEME)) { brokerFileSystem = getS3AFileSystem(path, properties); - } else { + } else if (scheme.equals(CHDFS_SCHEME)) { + brokerFileSystem = getChdfsFileSystem(path, properties); + }else { throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH, "invalid path. scheme is not supported"); } @@ -421,6 +424,136 @@ public class FileSystemManager { } } + /** + * visible for test + * + * file system handle is cached, the identity is for all chdfs. + * @param path + * @param properties + * @return + * @throws URISyntaxException + * @throws Exception + */ + public BrokerFileSystem getChdfsFileSystem(String path, Map<String, String> properties) { + WildcardURI pathUri = new WildcardURI(path); + String host = CHDFS_SCHEME; + String authentication = properties.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + AUTHENTICATION_SIMPLE); + if (Strings.isNullOrEmpty(authentication) || (!authentication.equals(AUTHENTICATION_SIMPLE) + && !authentication.equals(AUTHENTICATION_KERBEROS))) { + logger.warn("invalid authentication:" + authentication); + throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, + "invalid authentication:" + authentication); + } + + FileSystemIdentity fileSystemIdentity = null; + if (authentication.equals(AUTHENTICATION_SIMPLE)) { + fileSystemIdentity = new FileSystemIdentity(host, ""); + } else { + // for kerberos, use host + principal + keytab as filesystemindentity + String kerberosContent = ""; + if (properties.containsKey(KERBEROS_KEYTAB)) { + kerberosContent = properties.get(KERBEROS_KEYTAB); + } else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) { + kerberosContent = properties.get(KERBEROS_KEYTAB_CONTENT); + } else { + throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, + "keytab is required for kerberos authentication"); + } + if (!properties.containsKey(KERBEROS_PRINCIPAL)) { + throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, + "principal is required for kerberos authentication"); + } else { + kerberosContent = kerberosContent + properties.get(KERBEROS_PRINCIPAL); + } + try { + MessageDigest digest = MessageDigest.getInstance("md5"); + byte[] result = digest.digest(kerberosContent.getBytes()); + String kerberosUgi = new String(result); + fileSystemIdentity = new FileSystemIdentity(host, kerberosUgi); + } catch (NoSuchAlgorithmException e) { + throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, + e.getMessage()); + } + } + + + BrokerFileSystem fileSystem = null; + cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity)); + fileSystem = cachedFileSystem.get(fileSystemIdentity); + if (fileSystem == null) { + // it means it is removed concurrently by checker thread + return null; + } + fileSystem.getLock().lock(); + + try { + if (!cachedFileSystem.containsKey(fileSystemIdentity)) { + // this means the file system is closed by file system checker thread + // it is a corner case + return null; + } + // create a new filesystem + Configuration conf = new Configuration(); + for (Map.Entry<String, String> propElement : properties.entrySet()) { + conf.set(propElement.getKey(), propElement.getValue()); + } + + if (fileSystem.getDFSFileSystem() == null) { + logger.info("create file system for new path " + path); + String tmpFilePath = null; + if (authentication.equals(AUTHENTICATION_KERBEROS)){ + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + AUTHENTICATION_KERBEROS); + + String principal = preparePrincipal(properties.get(KERBEROS_PRINCIPAL)); + String keytab = ""; + if (properties.containsKey(KERBEROS_KEYTAB)) { + keytab = properties.get(KERBEROS_KEYTAB); + } else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) { + // pass kerberos keytab content use base64 encoding + // so decode it and write it to tmp path under /tmp + // because ugi api only accept a local path as argument + String keytab_content = properties.get(KERBEROS_KEYTAB_CONTENT); + byte[] base64decodedBytes = Base64.getDecoder().decode(keytab_content); + long currentTime = System.currentTimeMillis(); + Random random = new Random(currentTime); + int randNumber = random.nextInt(10000); + tmpFilePath = "/tmp/." + Long.toString(currentTime) + "_" + Integer.toString(randNumber); + FileOutputStream fileOutputStream = new FileOutputStream(tmpFilePath); + fileOutputStream.write(base64decodedBytes); + fileOutputStream.close(); + keytab = tmpFilePath; + } else { + throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, + "keytab is required for kerberos authentication"); + } + UserGroupInformation.setConfiguration(conf); + UserGroupInformation.loginUserFromKeytab(principal, keytab); + if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) { + try { + File file = new File(tmpFilePath); + if(!file.delete()){ + logger.warn("delete tmp file:" + tmpFilePath + " failed"); + } + } catch (Exception e) { + throw new BrokerException(TBrokerOperationStatusCode.FILE_NOT_FOUND, + e.getMessage()); + } + } + } + FileSystem chdfsFileSystem = FileSystem.get(pathUri.getUri(), conf); + fileSystem.setFileSystem(chdfsFileSystem); + } + return fileSystem; + } catch (Exception e) { + logger.error("errors while connect to " + path, e); + throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e); + } finally { + fileSystem.getLock().unlock(); + } + } + public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) { List<TBrokerFileStatus> resultFileStatus = null; WildcardURI pathUri = new WildcardURI(path); diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 8c9a46ff48..ac0838b4db 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -91,6 +91,7 @@ enum TTypeNodeType { enum TStorageBackendType { BROKER, S3, + OFS, HDFS, LOCAL } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org