This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new a584cc04178 [feature](paimon)support native reader for 2.0 (#29612)
a584cc04178 is described below
commit a584cc04178cc58ff8596ba072fdf7e970364323
Author: wuwenchi <[email protected]>
AuthorDate: Sun Jan 7 21:46:04 2024 +0800
[feature](paimon)support native reader for 2.0 (#29612)
bp #29339
---
be/src/vec/exec/scan/vfile_scanner.cpp | 17 +-
.../java/org/apache/doris/common/FeConstants.java | 30 +-
.../org/apache/doris/common/util/LocationPath.java | 380 +++++++++++++++++++++
.../property/constants/PaimonProperties.java | 1 +
.../planner/external/paimon/PaimonScanNode.java | 58 +++-
.../planner/external/paimon/PaimonSource.java | 5 +
.../doris/planner/external/paimon/PaimonSplit.java | 26 +-
fe/pom.xml | 4 +-
gensrc/thrift/PlanNodes.thrift | 1 +
9 files changed, 497 insertions(+), 25 deletions(-)
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 72ae8e2d73f..d6ba62db53a 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -676,11 +676,22 @@ Status VFileScanner::_get_next_reader() {
// JNI reader can only push down column value range
bool push_down_predicates =
!_is_load && _params->format_type !=
TFileFormatType::FORMAT_JNI;
- if (format_type == TFileFormatType::FORMAT_JNI &&
range.__isset.table_format_params &&
- range.table_format_params.table_format_type == "hudi") {
- if (range.table_format_params.hudi_params.delta_logs.empty()) {
+ if (format_type == TFileFormatType::FORMAT_JNI &&
range.__isset.table_format_params) {
+ if (range.table_format_params.table_format_type == "hudi" &&
+ range.table_format_params.hudi_params.delta_logs.empty()) {
// fall back to native reader if there is no log file
format_type = TFileFormatType::FORMAT_PARQUET;
+ } else if (range.table_format_params.table_format_type == "paimon"
&&
+
!range.table_format_params.paimon_params.__isset.paimon_split) {
+ // use native reader
+ auto format =
range.table_format_params.paimon_params.file_format;
+ if (format == "orc") {
+ format_type = TFileFormatType::FORMAT_ORC;
+ } else if (format == "parquet") {
+ format_type = TFileFormatType::FORMAT_PARQUET;
+ } else {
+ return Status::InternalError("Not supported paimon file
format: {}", format);
+ }
}
}
bool need_to_get_parsed_schema = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 487d50283d9..3012f6a62e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -65,21 +65,21 @@ public class FeConstants {
public static long tablet_checker_interval_ms = 20 * 1000L;
public static long tablet_schedule_interval_ms = 1000L;
- public static String FS_PREFIX_S3 = "s3";
- public static String FS_PREFIX_S3A = "s3a";
- public static String FS_PREFIX_S3N = "s3n";
- public static String FS_PREFIX_OSS = "oss";
- public static String FS_PREFIX_GCS = "gs";
- public static String FS_PREFIX_BOS = "bos";
- public static String FS_PREFIX_COS = "cos";
- public static String FS_PREFIX_COSN = "cosn";
- public static String FS_PREFIX_OBS = "obs";
- public static String FS_PREFIX_OFS = "ofs";
- public static String FS_PREFIX_GFS = "gfs";
- public static String FS_PREFIX_JFS = "jfs";
- public static String FS_PREFIX_HDFS = "hdfs";
- public static String FS_PREFIX_VIEWFS = "viewfs";
- public static String FS_PREFIX_FILE = "file";
+ public static final String FS_PREFIX_S3 = "s3";
+ public static final String FS_PREFIX_S3A = "s3a";
+ public static final String FS_PREFIX_S3N = "s3n";
+ public static final String FS_PREFIX_OSS = "oss";
+ public static final String FS_PREFIX_GCS = "gs";
+ public static final String FS_PREFIX_BOS = "bos";
+ public static final String FS_PREFIX_COS = "cos";
+ public static final String FS_PREFIX_COSN = "cosn";
+ public static final String FS_PREFIX_OBS = "obs";
+ public static final String FS_PREFIX_OFS = "ofs";
+ public static final String FS_PREFIX_GFS = "gfs";
+ public static final String FS_PREFIX_JFS = "jfs";
+ public static final String FS_PREFIX_HDFS = "hdfs";
+ public static final String FS_PREFIX_VIEWFS = "viewfs";
+ public static final String FS_PREFIX_FILE = "file";
public static final String INTERNAL_DB_NAME = "__internal_schema";
public static String TEMP_MATERIZLIZE_DVIEW_PREFIX =
"internal_tmp_materialized_view_";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
new file mode 100644
index 00000000000..d56e67bb0d1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -0,0 +1,380 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.HdfsResource;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.property.constants.CosProperties;
+import org.apache.doris.datasource.property.constants.ObsProperties;
+import org.apache.doris.datasource.property.constants.OssProperties;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.fs.FileSystemType;
+import org.apache.doris.thrift.TFileType;
+
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class LocationPath {
+ private static final Logger LOG = LogManager.getLogger(LocationPath.class);
+ private static final String SCHEME_DELIM = "://";
+ private static final String NONSTANDARD_SCHEME_DELIM = ":/";
+ private final LocationType locationType;
+ private final String location;
+
+ enum LocationType {
+ HDFS,
+ LOCAL, // Local File
+ BOS, // Baidu
+ GCS, // Google,
+ OBS, // Huawei,
+ COS, // Tencent
+ COSN, // Tencent
+ OFS, // Tencent CHDFS
+ GFS, // Tencent GooseFs,
+ OSS, // Alibaba,
+ OSS_HDFS, // JindoFS on OSS
+ JFS, // JuiceFS,
+ S3,
+ S3A,
+ S3N,
+ VIEWFS,
+ UNKNOWN
+ }
+
+ private LocationPath(String location) {
+ this(location, new HashMap<>());
+ }
+
+ public LocationPath(String location, Map<String, String> props) {
+ String scheme = parseScheme(location).toLowerCase();
+ switch (scheme) {
+ case FeConstants.FS_PREFIX_HDFS:
+ locationType = LocationType.HDFS;
+ // Need add hdfs host to location
+ String host = props.get(HdfsResource.DSF_NAMESERVICES);
+ this.location = normalizedHdfsPath(location, host);
+ break;
+ case FeConstants.FS_PREFIX_S3:
+ locationType = LocationType.S3;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_S3A:
+ locationType = LocationType.S3A;
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_S3N:
+ // include the check for multi locations and in a table, such
as both s3 and hdfs are in a table.
+ locationType = LocationType.S3N;
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_BOS:
+ locationType = LocationType.BOS;
+ // use s3 client to access
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_GCS:
+ locationType = LocationType.GCS;
+ // use s3 client to access
+ this.location = convertToS3(location);
+ break;
+ case FeConstants.FS_PREFIX_OSS:
+ if (isHdfsOnOssEndpoint(location)) {
+ locationType = LocationType.OSS_HDFS;
+ this.location = location;
+ } else {
+ if (useS3EndPoint(props)) {
+ this.location = convertToS3(location);
+ } else {
+ this.location = location;
+ }
+ locationType = LocationType.OSS;
+ }
+ break;
+ case FeConstants.FS_PREFIX_COS:
+ if (useS3EndPoint(props)) {
+ this.location = convertToS3(location);
+ } else {
+ this.location = location;
+ }
+ locationType = LocationType.COS;
+ break;
+ case FeConstants.FS_PREFIX_OBS:
+ if (useS3EndPoint(props)) {
+ this.location = convertToS3(location);
+ } else {
+ this.location = location;
+ }
+ locationType = LocationType.OBS;
+ break;
+ case FeConstants.FS_PREFIX_OFS:
+ locationType = LocationType.OFS;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_JFS:
+ locationType = LocationType.JFS;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_GFS:
+ locationType = LocationType.GFS;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_COSN:
+ // if treat cosn(tencent hadoop-cos) as a s3 file system, may
bring incompatible issues
+ locationType = LocationType.COSN;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_VIEWFS:
+ locationType = LocationType.VIEWFS;
+ this.location = location;
+ break;
+ case FeConstants.FS_PREFIX_FILE:
+ locationType = LocationType.LOCAL;
+ this.location = location;
+ break;
+ default:
+ locationType = LocationType.UNKNOWN;
+ this.location = location;
+ }
+ }
+
+ private static String parseScheme(String location) {
+ String[] schemeSplit = location.split(SCHEME_DELIM);
+ if (schemeSplit.length > 1) {
+ return schemeSplit[0];
+ } else {
+ schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM);
+ if (schemeSplit.length > 1) {
+ return schemeSplit[0];
+ }
+ throw new IllegalArgumentException("Fail to parse scheme, invalid
location: " + location);
+ }
+ }
+
+ private boolean useS3EndPoint(Map<String, String> props) {
+ if (props.containsKey(ObsProperties.ENDPOINT)
+ || props.containsKey(OssProperties.ENDPOINT)
+ || props.containsKey(CosProperties.ENDPOINT)) {
+ return false;
+ }
+ // wide check range for the compatibility of s3 properties
+ return (props.containsKey(S3Properties.ENDPOINT) ||
props.containsKey(S3Properties.Env.ENDPOINT));
+ }
+
+ public static boolean isHdfsOnOssEndpoint(String location) {
+ // example: cn-shanghai.oss-dls.aliyuncs.com contains the
"oss-dls.aliyuncs".
+ //
https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
+ return location.contains("oss-dls.aliyuncs");
+ }
+
+ /**
+ * The converted path is used for FE to get metadata
+ * @param location origin location
+ * @return metadata location path. just convert when storage is compatible
with s3 client.
+ */
+ private static String convertToS3(String location) {
+ LOG.debug("try convert location to s3 prefix: " + location);
+ int pos = findDomainPos(location);
+ return "s3" + location.substring(pos);
+ }
+
+ private static int findDomainPos(String rangeLocation) {
+ int pos = rangeLocation.indexOf("://");
+ if (pos == -1) {
+ throw new RuntimeException("No '://' found in location: " +
rangeLocation);
+ }
+ return pos;
+ }
+
+ private static String normalizedHdfsPath(String location, String host) {
+ try {
+ // Hive partition may contain special characters such as ' ', '<',
'>' and so on.
+ // Need to encode these characters before creating URI.
+ // But doesn't encode '/' and ':' so that we can get the correct
uri host.
+ location = URLEncoder.encode(location,
StandardCharsets.UTF_8.name())
+ .replace("%2F", "/").replace("%3A", ":");
+ URI normalizedUri = new URI(location);
+ // compatible with 'hdfs:///' or 'hdfs:/'
+ if (StringUtils.isEmpty(normalizedUri.getHost())) {
+ location = URLDecoder.decode(location,
StandardCharsets.UTF_8.name());
+ String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//";
+ String brokenPrefix = HdfsResource.HDFS_PREFIX + "/";
+ if (location.startsWith(brokenPrefix) &&
!location.startsWith(normalizedPrefix)) {
+ location = location.replace(brokenPrefix,
normalizedPrefix);
+ }
+ if (StringUtils.isNotEmpty(host)) {
+ // Replace 'hdfs://key/' to 'hdfs://name_service/key/'
+ // Or hdfs:///abc to hdfs://name_service/abc
+ return location.replace(normalizedPrefix, normalizedPrefix
+ host + "/");
+ } else {
+ // 'hdfs://null/' equals the 'hdfs:///'
+ if (location.startsWith(HdfsResource.HDFS_PREFIX + "///"))
{
+ // Do not support hdfs:///location
+ throw new RuntimeException("Invalid location with
empty host: " + location);
+ } else {
+ // Replace 'hdfs://key/' to '/key/', try access local
NameNode on BE.
+ return location.replace(normalizedPrefix, "/");
+ }
+ }
+ }
+ return URLDecoder.decode(location, StandardCharsets.UTF_8.name());
+ } catch (URISyntaxException | UnsupportedEncodingException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public static Pair<FileSystemType, String> getFSIdentity(String location,
String bindBrokerName) {
+ LocationPath locationPath = new LocationPath(location);
+ FileSystemType fsType = (bindBrokerName != null) ?
FileSystemType.BROKER : locationPath.getFileSystemType();
+ URI uri = locationPath.getPath().toUri();
+ String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" +
Strings.nullToEmpty(uri.getAuthority());
+ return Pair.of(fsType, fsIdent);
+ }
+
+ private FileSystemType getFileSystemType() {
+ FileSystemType fsType;
+ switch (locationType) {
+ case S3:
+ case S3A:
+ case S3N:
+ case COS:
+ case OSS:
+ case OBS:
+ case BOS:
+ case GCS:
+ // All storage will use s3 client to access on BE, so need
convert to s3
+ fsType = FileSystemType.S3;
+ break;
+ case COSN:
+ case OFS:
+ // ofs:// and cosn:// use the same underlying file system:
Tencent Cloud HDFS, aka CHDFS)) {
+ fsType = FileSystemType.OFS;
+ break;
+ case HDFS:
+ case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib
to access oss.
+ case VIEWFS:
+ case GFS:
+ fsType = FileSystemType.DFS;
+ break;
+ case JFS:
+ fsType = FileSystemType.JFS;
+ break;
+ case LOCAL:
+ fsType = FileSystemType.FILE;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown file system
for location: " + location);
+ }
+ return fsType;
+ }
+
+ /**
+ * provide file type for BE.
+ * @param location the location is from fs.listFile
+ * @return on BE, we will use TFileType to get the suitable client to
access storage.
+ */
+ public static TFileType getTFileType(String location) {
+ if (location == null || location.isEmpty()) {
+ return null;
+ }
+ LocationPath locationPath = new LocationPath(location);
+ switch (locationPath.getLocationType()) {
+ case S3:
+ case S3A:
+ case S3N:
+ case COS:
+ case OSS:
+ case OBS:
+ case BOS:
+ case GCS:
+ // now we only support S3 client for object storage on BE
+ return TFileType.FILE_S3;
+ case HDFS:
+ case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib
to access oss.
+ case VIEWFS:
+ case COSN:
+ return TFileType.FILE_HDFS;
+ case GFS:
+ case JFS:
+ case OFS:
+ return TFileType.FILE_BROKER;
+ case LOCAL:
+ return TFileType.FILE_LOCAL;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * The converted path is used for BE
+ * @return BE scan range path
+ */
+ public Path toScanRangeLocation() {
+ switch (locationType) {
+ case S3:
+ case S3A:
+ case S3N:
+ case COS:
+ case OSS:
+ case OBS:
+ case BOS:
+ case GCS:
+ // All storage will use s3 client to access on BE, so need
convert to s3
+ return new Path(convertToS3(location));
+ case HDFS:
+ case OSS_HDFS:
+ case VIEWFS:
+ case COSN:
+ case GFS:
+ case JFS:
+ case OFS:
+ case LOCAL:
+ default:
+ return getPath();
+ }
+ }
+
+ public LocationType getLocationType() {
+ return locationType;
+ }
+
+ public String get() {
+ return location;
+ }
+
+ public Path getPath() {
+ return new Path(location);
+ }
+
+ @Override
+ public String toString() {
+ return get();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
index 06d205ff221..318e2bac30a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java
@@ -25,6 +25,7 @@ import java.util.Map;
public class PaimonProperties {
public static final String WAREHOUSE = "warehouse";
+ public static final String FILE_FORMAT = "file.format";
public static final String PAIMON_PREFIX = "paimon";
public static final String PAIMON_CATALOG_TYPE = "metastore";
public static final String HIVE_METASTORE_URIS = "uri";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
index 4ba87c878e4..678a0d77197 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
@@ -41,15 +42,20 @@ import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TTableFormatFileDesc;
import avro.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.AbstractFileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.InstantiationUtil;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -102,6 +108,11 @@ public class PaimonScanNode extends FileQueryScanNode {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
TPaimonFileDesc fileDesc = new TPaimonFileDesc();
+ org.apache.paimon.table.source.Split split = paimonSplit.getSplit();
+ if (split != null) {
+ fileDesc.setPaimonSplit(encodeObjectToString(split));
+ }
+ fileDesc.setFileFormat(source.getFileFormat());
fileDesc.setPaimonSplit(encodeObjectToString(paimonSplit.getSplit()));
fileDesc.setPaimonPredicate(encodeObjectToString(predicates));
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot ->
slot.getColumn().getName())
@@ -127,13 +138,52 @@ public class PaimonScanNode extends FileQueryScanNode {
List<org.apache.paimon.table.source.Split> paimonSplits =
readBuilder.withFilter(predicates)
.withProjection(projected)
.newScan().plan().splits();
+ boolean supportNative = supportNativeReader();
for (org.apache.paimon.table.source.Split split : paimonSplits) {
- PaimonSplit paimonSplit = new PaimonSplit(split);
- splits.add(paimonSplit);
+ if (supportNative && split instanceof DataSplit) {
+ DataSplit dataSplit = (DataSplit) split;
+ Optional<List<RawFile>> optRowFiles =
dataSplit.convertToRawFiles();
+ if (optRowFiles.isPresent()) {
+ List<RawFile> rawFiles = optRowFiles.get();
+ for (RawFile file : rawFiles) {
+ LocationPath locationPath = new
LocationPath(file.path(), source.getCatalog().getProperties());
+ Path finalDataFilePath =
locationPath.toScanRangeLocation();
+ try {
+ splits.addAll(
+ splitFile(
+ finalDataFilePath,
+ 0,
+ null,
+ file.length(),
+ -1,
+ true,
+ null,
+
PaimonSplit.PaimonSplitCreator.DEFAULT));
+ } catch (IOException e) {
+ throw new UserException("Paimon error to split
file: " + e.getMessage(), e);
+ }
+ }
+ } else {
+ splits.add(new PaimonSplit(split));
+ }
+ } else {
+ splits.add(new PaimonSplit(split));
+ }
}
return splits;
}
+ private boolean supportNativeReader() {
+ String fileFormat = source.getFileFormat().toLowerCase();
+ switch (fileFormat) {
+ case "orc":
+ case "parquet":
+ return true;
+ default:
+ return false;
+ }
+ }
+
//When calling 'setPaimonParams' and 'getSplits', the column trimming has
not been performed yet,
// Therefore, paimon_column_names is temporarily reset here
@Override
@@ -157,8 +207,8 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public TFileType getLocationType(String location) throws DdlException,
MetaNotFoundException {
- //todo: no use
- return TFileType.FILE_S3;
+ return
Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
+ new DdlException("Unknown file location " + location + " for
paimon table "));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
index 2f55e30c086..fa838350c82 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.PaimonExternalTable;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.property.constants.PaimonProperties;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileAttributes;
@@ -61,4 +62,8 @@ public class PaimonSource {
public ExternalCatalog getCatalog() {
return paimonExtTable.getCatalog();
}
+
+ public String getFileFormat() {
+ return
originTable.options().getOrDefault(PaimonProperties.FILE_FORMAT, "orc");
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
index 8ecf539db91..13263fb5842 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java
@@ -18,21 +18,30 @@
package org.apache.doris.planner.external.paimon;
import org.apache.doris.planner.external.FileSplit;
+import org.apache.doris.planner.external.SplitCreator;
import org.apache.doris.planner.external.TableFormatType;
import org.apache.hadoop.fs.Path;
import org.apache.paimon.table.source.Split;
+import java.util.List;
+
public class PaimonSplit extends FileSplit {
private Split split;
private TableFormatType tableFormatType;
public PaimonSplit(Split split) {
- super(new Path("dummyPath"), 0, 0, 0, null, null);
+ super(new Path("hdfs://dummyPath"), 0, 0, 0, null, null);
this.split = split;
this.tableFormatType = TableFormatType.PAIMON;
}
+ public PaimonSplit(Path file, long start, long length, long fileLength,
String[] hosts,
+ List<String> partitionList) {
+ super(file, start, length, fileLength, hosts, partitionList);
+ this.tableFormatType = TableFormatType.PAIMON;
+ }
+
public Split getSplit() {
return split;
}
@@ -49,4 +58,19 @@ public class PaimonSplit extends FileSplit {
this.tableFormatType = tableFormatType;
}
+ public static class PaimonSplitCreator implements SplitCreator {
+
+ static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator();
+
+ @Override
+ public org.apache.doris.spi.Split create(Path path,
+ long start,
+ long length,
+ long fileLength,
+ long modificationTime,
+ String[] hosts,
+ List<String> partitionValues)
{
+ return new PaimonSplit(path, start, length, fileLength, hosts,
partitionValues);
+ }
+ }
}
diff --git a/fe/pom.xml b/fe/pom.xml
index 4f4de40a670..ad126d7111e 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -221,7 +221,7 @@ under the License.
<doris.home>${fe.dir}/../</doris.home>
<revision>1.2-SNAPSHOT</revision>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
<doris.hive.catalog.shade.version>1.0.2</doris.hive.catalog.shade.version>
+
<doris.hive.catalog.shade.version>1.0.3</doris.hive.catalog.shade.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!--plugin parameters-->
@@ -342,7 +342,7 @@ under the License.
<!--todo waiting release-->
<quartz.version>2.3.2</quartz.version>
<!-- paimon -->
- <paimon.version>0.5.0-incubating</paimon.version>
+ <paimon.version>0.6.0-incubating</paimon.version>
<disruptor.version>3.4.4</disruptor.version>
<trino.parser.version>395</trino.parser.version>
<!-- arrow flight sql -->
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index a57745e78d3..506dd6e3f4f 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -308,6 +308,7 @@ struct TPaimonFileDesc {
8: optional i64 db_id
9: optional i64 tbl_id
10: optional i64 last_update_time
+ 11: optional string file_format
}
struct TMaxComputeFileDesc {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]