This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 64348055a1 [improvement](iceberg) Optimize the split to the user-specified size #22078 64348055a1 is described below commit 64348055a1b449b018018123b4828c5974591838 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Mon Jul 24 08:48:10 2023 +0800 [improvement](iceberg) Optimize the split to the user-specified size #22078 According to the specified split size, the split tasks are merged to keep a single task near the expected size. --- .../planner/external/iceberg/IcebergScanNode.java | 24 ++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index 2de2f8291c..3d3634fb66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -47,6 +47,7 @@ import org.apache.doris.thrift.TTableFormatFileDesc; import avro.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; @@ -57,8 +58,11 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.util.TableScanUtil; +import java.io.IOException; import java.nio.ByteBuffer; import java.time.Instant; import java.util.ArrayList; @@ -179,21 +183,29 @@ public class IcebergScanNode extends FileQueryScanNode { int formatVersion = ((BaseTable) table).operations().current().formatVersion(); // Min split size is DEFAULT_SPLIT_SIZE(128MB). long splitSize = Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), DEFAULT_SPLIT_SIZE); - for (FileScanTask task : scan.planFiles()) { - long fileSize = task.file().fileSizeInBytes(); - for (FileScanTask splitTask : task.split(splitSize)) { + CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize); + try (CloseableIterable<CombinedScanTask> combinedScanTasks = + TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) { + combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> { String dataFilePath = splitTask.file().path().toString(); Path finalDataFilePath = S3Util.toScanRangeLocation(dataFilePath); - IcebergSplit split = new IcebergSplit(finalDataFilePath, splitTask.start(), - splitTask.length(), fileSize, new String[0]); + IcebergSplit split = new IcebergSplit( + finalDataFilePath, + splitTask.start(), + splitTask.length(), + splitTask.file().fileSizeInBytes(), + new String[0]); split.setFormatVersion(formatVersion); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); } split.setTableFormatType(TableFormatType.ICEBERG); splits.add(split); - } + })); + } catch (IOException e) { + throw new UserException(e.getMessage(), e.getCause()); } + return splits; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org