This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new af0c1acb351 [fix](catalog) opt the count pushdown rule for 
iceberg/paimon/hive scan node (#44038) (#45224)
af0c1acb351 is described below

commit af0c1acb351b44d9edbb1ded9d7a223dfbfeabd0
Author: Mingyu Chen (Rayner) <morning...@163.com>
AuthorDate: Sat Dec 14 20:32:06 2024 -0800

    [fix](catalog) opt the count pushdown rule for iceberg/paimon/hive scan 
node (#44038) (#45224)
    
    bp #44038
---
 be/src/vec/exec/format/table/iceberg_reader.cpp    |  18 ++--
 be/src/vec/exec/format/table/iceberg_reader.h      |   4 +-
 be/src/vec/exec/format/table/paimon_reader.cpp     |   3 +
 be/src/vec/exec/scan/vfile_scanner.cpp             |   4 +
 .../doris/analysis/TableValuedFunctionRef.java     |   5 +-
 .../apache/doris/datasource/FileQueryScanNode.java |  27 +++++-
 .../org/apache/doris/datasource/FileScanNode.java  |  79 ----------------
 .../org/apache/doris/datasource/FileSplit.java     |   5 +-
 .../org/apache/doris/datasource/FileSplitter.java  | 104 +++++++++++++++++++++
 .../apache/doris/datasource/SplitAssignment.java   |   2 +-
 .../org/apache/doris/datasource/SplitCreator.java  |   2 +-
 .../apache/doris/datasource/SplitGenerator.java    |   5 +-
 .../doris/datasource/hive/source/HiveScanNode.java |  52 ++++++++---
 .../doris/datasource/hive/source/HiveSplit.java    |   6 +-
 .../doris/datasource/hudi/source/HudiScanNode.java |   8 +-
 .../datasource/iceberg/source/IcebergScanNode.java |  42 +++++----
 .../datasource/iceberg/source/IcebergSplit.java    |  11 +--
 .../lakesoul/source/LakeSoulScanNode.java          |   7 +-
 .../maxcompute/source/MaxComputeScanNode.java      |  28 +++---
 .../datasource/paimon/source/PaimonScanNode.java   |  65 +++++++------
 .../datasource/paimon/source/PaimonSplit.java      |   7 +-
 .../source/TrinoConnectorScanNode.java             |   9 +-
 .../doris/datasource/tvf/source/TVFScanNode.java   |  22 ++++-
 .../glue/translator/PhysicalPlanTranslator.java    |  20 ++--
 .../apache/doris/planner/SingleNodePlanner.java    |  24 ++---
 .../tablefunction/DataGenTableValuedFunction.java  |   3 +-
 .../ExternalFileTableValuedFunction.java           |   5 +-
 .../GroupCommitTableValuedFunction.java            |   3 +-
 .../tablefunction/JdbcQueryTableValueFunction.java |   3 +-
 .../tablefunction/MetadataTableValuedFunction.java |   3 +-
 .../tablefunction/QueryTableValueFunction.java     |   3 +-
 .../doris/tablefunction/TableValuedFunctionIf.java |   3 +-
 .../paimon/test_paimon_catalog.out                 |  80 ++++++++++++++++
 .../iceberg/test_iceberg_optimize_count.groovy     |   2 +-
 .../paimon/test_paimon_catalog.groovy              |  14 +++
 35 files changed, 450 insertions(+), 228 deletions(-)

diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 8f130ca6002..837269b0bb3 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -96,25 +96,25 @@ 
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
     _iceberg_profile.delete_rows_sort_time =
             ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
     if (range.table_format_params.iceberg_params.__isset.row_count) {
-        _remaining_push_down_count = 
range.table_format_params.iceberg_params.row_count;
+        _remaining_table_level_row_count = 
range.table_format_params.iceberg_params.row_count;
     } else {
-        _remaining_push_down_count = -1;
+        _remaining_table_level_row_count = -1;
     }
 }
 
 Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
     // already get rows from be
-    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_push_down_count > 0) {
-        auto rows =
-                std::min(_remaining_push_down_count, 
(int64_t)_state->query_options().batch_size);
-        _remaining_push_down_count -= rows;
+    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_table_level_row_count > 0) {
+        auto rows = std::min(_remaining_table_level_row_count,
+                             (int64_t)_state->query_options().batch_size);
+        _remaining_table_level_row_count -= rows;
         auto mutate_columns = block->mutate_columns();
         for (auto& col : mutate_columns) {
             col->resize(rows);
         }
         block->set_columns(std::move(mutate_columns));
         *read_rows = rows;
-        if (_remaining_push_down_count == 0) {
+        if (_remaining_table_level_row_count == 0) {
             *eof = true;
         }
 
@@ -164,7 +164,7 @@ Status IcebergTableReader::get_columns(
 
 Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, 
io::IOContext* io_ctx) {
     // We get the count value by doris's be, so we don't need to read the 
delete file
-    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_push_down_count > 0) {
+    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_table_level_row_count > 0) {
         return Status::OK();
     }
 
@@ -187,9 +187,11 @@ Status IcebergTableReader::init_row_filters(const 
TFileRangeDesc& range, io::IOC
     if (position_delete_files.size() > 0) {
         RETURN_IF_ERROR(
                 _position_delete_base(table_desc.original_file_path, 
position_delete_files));
+        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
     }
     if (equality_delete_files.size() > 0) {
         RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
+        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
     }
 
     COUNTER_UPDATE(_iceberg_profile.num_delete_files, 
table_desc.delete_files.size());
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index 2e240f465b6..ee7dcdd68d2 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -167,7 +167,9 @@ protected:
     bool _has_schema_change = false;
     bool _has_iceberg_schema = false;
 
-    int64_t _remaining_push_down_count;
+    // the table level row count for optimizing query like:
+    // select count(*) from table;
+    int64_t _remaining_table_level_row_count;
     Fileformat _file_format = Fileformat::NONE;
 
     const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp 
b/be/src/vec/exec/format/table/paimon_reader.cpp
index 263fdc8014b..055d6179b2c 100644
--- a/be/src/vec/exec/format/table/paimon_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_reader.cpp
@@ -40,6 +40,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& 
range, io::IOContext
         return Status::OK();
     }
 
+    // set push down agg type to NONE because we can not do count push down opt
+    // if there are delete files.
+    _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
     const auto& deletion_file = table_desc.deletion_file;
     io::FileSystemProperties properties = {
             .system_type = _params.file_type,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 296e59f8df1..d53bb105c70 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -788,6 +788,9 @@ Status VFileScanner::_get_next_reader() {
                     _should_enable_file_meta_cache() ? 
ExecEnv::GetInstance()->file_meta_cache()
                                                      : nullptr,
                     _state->query_options().enable_parquet_lazy_mat);
+            // ATTN: the push down agg type may be set back to NONE,
+            // see IcebergTableReader::init_row_filters for example.
+            parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
             {
                 SCOPED_TIMER(_open_reader_timer);
                 RETURN_IF_ERROR(parquet_reader->open());
@@ -853,6 +856,7 @@ Status VFileScanner::_get_next_reader() {
                     _profile, _state, *_params, range, 
_state->query_options().batch_size,
                     _state->timezone(), _io_ctx.get(), 
_state->query_options().enable_orc_lazy_mat,
                     unsupported_pushdown_types);
+            orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
             if (push_down_predicates) {
                 RETURN_IF_ERROR(_process_late_arrival_conjuncts());
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
index b1e7c7c89e9..9eacb8a0422 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableValuedFunctionRef.java
@@ -26,6 +26,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.tablefunction.BackendsTableValuedFunction;
 import org.apache.doris.tablefunction.LocalTableValuedFunction;
 import org.apache.doris.tablefunction.TableValuedFunctionIf;
@@ -119,8 +120,8 @@ public class TableValuedFunctionRef extends TableRef {
         analyzeJoin(analyzer);
     }
 
-    public ScanNode getScanNode(PlanNodeId id) {
-        return tableFunction.getScanNode(id, desc);
+    public ScanNode getScanNode(PlanNodeId id, SessionVariable sv) {
+        return tableFunction.getScanNode(id, desc, sv);
     }
 
     public TableValuedFunctionIf getTableFunction() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 5ea2a2637d8..4a071fa6682 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -39,6 +39,7 @@ import org.apache.doris.datasource.hive.source.HiveScanNode;
 import org.apache.doris.datasource.hive.source.HiveSplit;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
@@ -94,6 +95,9 @@ public abstract class FileQueryScanNode extends FileScanNode {
     protected String brokerName;
 
     protected TableSnapshot tableSnapshot;
+    // Save the reference of session variable, so that we don't need to get it 
from connection context.
+    // connection context is a thread local variable, it is not available is 
running in other thread.
+    protected SessionVariable sessionVariable;
 
     /**
      * External file scan node for Query hms table
@@ -102,8 +106,10 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
      * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
      */
     public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
-                             StatisticalType statisticalType, boolean 
needCheckColumnPriv) {
+            StatisticalType statisticalType, boolean needCheckColumnPriv,
+            SessionVariable sv) {
         super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+        this.sessionVariable = sv;
     }
 
     @Override
@@ -112,7 +118,6 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             
ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeStartTime();
         }
         super.init(analyzer);
-        initFileSplitSize();
         doInitialize();
         if (ConnectContext.get().getExecutor() != null) {
             
ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeFinishTime();
@@ -314,6 +319,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             params.setProperties(locationProperties);
         }
 
+        int numBackends = backendPolicy.numBackends();
         List<String> pathPartitionKeys = getPathPartitionKeys();
         if (isBatchMode()) {
             // File splits are generated lazily, and fetched by backends while 
scanning.
@@ -356,7 +362,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                 scanBackendIds.add(backend.getId());
             }
         } else {
-            List<Split> inputSplits = getSplits();
+            List<Split> inputSplits = getSplits(numBackends);
             if (ConnectContext.get().getExecutor() != null) {
                 
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
             }
@@ -605,4 +611,19 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         }
         return this.tableSnapshot;
     }
+
+    /**
+     * The real file split size is determined by:
+     * 1. If user specify the split size in session variable 
`file_split_size`, use user specified value.
+     * 2. Otherwise, use the max value of DEFAULT_SPLIT_SIZE and block size.
+     * @param blockSize, got from file system, eg, hdfs
+     * @return the real file split size
+     */
+    protected long getRealFileSplitSize(long blockSize) {
+        long realSplitSize = sessionVariable.getFileSplitSize();
+        if (realSplitSize <= 0) {
+            realSplitSize = Math.max(DEFAULT_SPLIT_SIZE, blockSize);
+        }
+        return realSplitSize;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 29fdb2b09ac..b7d34312313 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -25,15 +25,10 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.common.util.Util;
 import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TExpr;
-import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.TFileScanNode;
 import org.apache.doris.thrift.TFileScanRangeParams;
@@ -46,11 +41,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -68,8 +61,6 @@ public abstract class FileScanNode extends ExternalScanNode {
     // For explain
     protected long totalFileSize = 0;
     protected long totalPartitionNum = 0;
-    protected long fileSplitSize;
-    protected boolean isSplitSizeSetBySession = false;
 
     public FileScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName, StatisticalType statisticalType,
             boolean needCheckColumnPriv) {
@@ -77,19 +68,6 @@ public abstract class FileScanNode extends ExternalScanNode {
         this.needCheckColumnPriv = needCheckColumnPriv;
     }
 
-    @Override
-    public void init() throws UserException {
-        initFileSplitSize();
-    }
-
-    protected void initFileSplitSize() {
-        this.fileSplitSize = 
ConnectContext.get().getSessionVariable().getFileSplitSize();
-        this.isSplitSizeSetBySession = this.fileSplitSize > 0;
-        if (this.fileSplitSize <= 0) {
-            this.fileSplitSize = DEFAULT_SPLIT_SIZE;
-        }
-    }
-
     @Override
     protected void toThrift(TPlanNode planNode) {
         planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
@@ -256,61 +234,4 @@ public abstract class FileScanNode extends 
ExternalScanNode {
             }
         }
     }
-
-    protected List<Split> splitFile(LocationPath path, long blockSize, 
BlockLocation[] blockLocations, long length,
-            long modificationTime, boolean splittable, List<String> 
partitionValues, SplitCreator splitCreator)
-            throws IOException {
-        if (blockLocations == null) {
-            blockLocations = new BlockLocation[0];
-        }
-        List<Split> result = Lists.newArrayList();
-        TFileCompressType compressType = 
Util.inferFileCompressTypeByPath(path.get());
-        if (!splittable || compressType != TFileCompressType.PLAIN) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Path {} is not splittable.", path);
-            }
-            String[] hosts = blockLocations.length == 0 ? null : 
blockLocations[0].getHosts();
-            result.add(splitCreator.create(path, 0, length, length, 
modificationTime, hosts, partitionValues));
-            return result;
-        }
-        // if file split size is set by session variable, use session variable.
-        // Otherwise, use max(file split size, block size)
-        if (!isSplitSizeSetBySession) {
-            fileSplitSize = Math.max(fileSplitSize, blockSize);
-        }
-        long bytesRemaining;
-        for (bytesRemaining = length; (double) bytesRemaining / (double) 
fileSplitSize > 1.1D;
-                bytesRemaining -= fileSplitSize) {
-            int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
-            String[] hosts = location == -1 ? null : 
blockLocations[location].getHosts();
-            result.add(splitCreator.create(path, length - bytesRemaining, 
fileSplitSize,
-                    length, modificationTime, hosts, partitionValues));
-        }
-        if (bytesRemaining != 0L) {
-            int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
-            String[] hosts = location == -1 ? null : 
blockLocations[location].getHosts();
-            result.add(splitCreator.create(path, length - bytesRemaining, 
bytesRemaining,
-                    length, modificationTime, hosts, partitionValues));
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Path {} includes {} splits.", path, result.size());
-        }
-        return result;
-    }
-
-    protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
-        if (blkLocations == null || blkLocations.length == 0) {
-            return -1;
-        }
-        for (int i = 0; i < blkLocations.length; ++i) {
-            if (blkLocations[i].getOffset() <= offset
-                    && offset < blkLocations[i].getOffset() + 
blkLocations[i].getLength()) {
-                return i;
-            }
-        }
-        BlockLocation last = blkLocations[blkLocations.length - 1];
-        long fileLength = last.getOffset() + last.getLength() - 1L;
-        throw new IllegalArgumentException(String.format("Offset %d is outside 
of file (0..%d)", offset, fileLength));
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
index 1ebb390e904..37e66c7056f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
@@ -87,9 +87,12 @@ public class FileSplit implements Split {
 
         @Override
         public Split create(LocationPath path, long start, long length, long 
fileLength,
+                long fileSplitSize,
                 long modificationTime, String[] hosts,
                 List<String> partitionValues) {
-            return new FileSplit(path, start, length, fileLength, 
modificationTime, hosts, partitionValues);
+            FileSplit split = new FileSplit(path, start, length, fileLength, 
modificationTime, hosts, partitionValues);
+            split.setTargetSplitSize(fileSplitSize);
+            return split;
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java
new file mode 100644
index 00000000000..b923c87d3ac
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.spi.Split;
+import org.apache.doris.thrift.TFileCompressType;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+
+public class FileSplitter {
+    private static final Logger LOG = LogManager.getLogger(FileSplitter.class);
+
+    // If the number of files is larger than parallel instances * num of 
backends,
+    // we don't need to split the file.
+    // Otherwise, split the file to avoid local shuffle.
+    public static boolean needSplitForCountPushdown(int parallelism, int 
numBackends, long totalFileNum) {
+        return totalFileNum < parallelism * numBackends;
+    }
+
+    public static List<Split> splitFile(
+            LocationPath path,
+            long fileSplitSize,
+            BlockLocation[] blockLocations,
+            long length,
+            long modificationTime,
+            boolean splittable,
+            List<String> partitionValues,
+            SplitCreator splitCreator)
+            throws IOException {
+        if (blockLocations == null) {
+            blockLocations = new BlockLocation[0];
+        }
+        List<Split> result = Lists.newArrayList();
+        TFileCompressType compressType = 
Util.inferFileCompressTypeByPath(path.get());
+        if (!splittable || compressType != TFileCompressType.PLAIN) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Path {} is not splittable.", path);
+            }
+            String[] hosts = blockLocations.length == 0 ? null : 
blockLocations[0].getHosts();
+            result.add(splitCreator.create(path, 0, length, length, 
fileSplitSize,
+                    modificationTime, hosts, partitionValues));
+            return result;
+        }
+        long bytesRemaining;
+        for (bytesRemaining = length; (double) bytesRemaining / (double) 
fileSplitSize > 1.1D;
+                bytesRemaining -= fileSplitSize) {
+            int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
+            String[] hosts = location == -1 ? null : 
blockLocations[location].getHosts();
+            result.add(splitCreator.create(path, length - bytesRemaining, 
fileSplitSize,
+                    length, fileSplitSize, modificationTime, hosts, 
partitionValues));
+        }
+        if (bytesRemaining != 0L) {
+            int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
+            String[] hosts = location == -1 ? null : 
blockLocations[location].getHosts();
+            result.add(splitCreator.create(path, length - bytesRemaining, 
bytesRemaining,
+                    length, fileSplitSize, modificationTime, hosts, 
partitionValues));
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Path {} includes {} splits.", path, result.size());
+        }
+        return result;
+    }
+
+    private static int getBlockIndex(BlockLocation[] blkLocations, long 
offset) {
+        if (blkLocations == null || blkLocations.length == 0) {
+            return -1;
+        }
+        for (int i = 0; i < blkLocations.length; ++i) {
+            if (blkLocations[i].getOffset() <= offset
+                    && offset < blkLocations[i].getOffset() + 
blkLocations[i].getLength()) {
+                return i;
+            }
+        }
+        BlockLocation last = blkLocations[blkLocations.length - 1];
+        long fileLength = last.getOffset() + last.getLength() - 1L;
+        throw new IllegalArgumentException(String.format("Offset %d is outside 
of file (0..%d)", offset, fileLength));
+    }
+
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
index 928854b91d1..a26abc7fc5e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
@@ -69,7 +69,7 @@ public class SplitAssignment {
     }
 
     public void init() throws UserException {
-        splitGenerator.startSplit();
+        splitGenerator.startSplit(backendPolicy.numBackends());
         synchronized (assignLock) {
             while (sampleSplit == null && waitFirstSplit()) {
                 try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java
index 4df30459db7..6df84d2f0f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitCreator.java
@@ -23,6 +23,6 @@ import org.apache.doris.spi.Split;
 import java.util.List;
 
 public interface SplitCreator {
-    Split create(LocationPath path, long start, long length, long fileLength,
+    Split create(LocationPath path, long start, long length, long fileLength, 
long fileSplitSize,
             long modificationTime, String[] hosts, List<String> 
partitionValues);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
index c4a373bc85b..34ff3911445 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
@@ -30,8 +30,9 @@ import java.util.List;
 public interface SplitGenerator {
     /**
      * Get all file splits if the producer doesn't support batch mode.
+     * @param numBackends the number of backends, this is useful when 
determine the number of splits.
      */
-    default List<Split> getSplits() throws UserException {
+    default List<Split> getSplits(int numBackends) throws UserException {
         // todo: remove this interface if batch mode is stable
         throw new NotImplementedException("Not implement");
     }
@@ -51,7 +52,7 @@ public interface SplitGenerator {
         return -1;
     }
 
-    default void startSplit() {
+    default void startSplit(int numBackends) {
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 99d3cd1cd21..35b21c368ea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -32,6 +32,7 @@ import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.FileSplit;
+import org.apache.doris.datasource.FileSplitter;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache;
@@ -44,12 +45,14 @@ import 
org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TFileAttributes;
 import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TPushAggOp;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -98,15 +101,13 @@ public class HiveScanNode extends FileQueryScanNode {
      * eg: s3 tvf
      * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
      */
-    public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
-        super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, 
needCheckColumnPriv);
-        hmsTable = (HMSExternalTable) desc.getTable();
-        brokerName = hmsTable.getCatalog().bindBrokerName();
+    public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv) {
+        this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, 
needCheckColumnPriv, sv);
     }
 
     public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
-                        StatisticalType statisticalType, boolean 
needCheckColumnPriv) {
-        super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+            StatisticalType statisticalType, boolean needCheckColumnPriv, 
SessionVariable sv) {
+        super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, 
sv);
         hmsTable = (HMSExternalTable) desc.getTable();
         brokerName = hmsTable.getCatalog().bindBrokerName();
     }
@@ -163,7 +164,7 @@ public class HiveScanNode extends FileQueryScanNode {
     }
 
     @Override
-    public List<Split> getSplits() throws UserException {
+    public List<Split> getSplits(int numBackends) throws UserException {
         long start = System.currentTimeMillis();
         try {
             if (!partitionInit) {
@@ -174,7 +175,7 @@ public class HiveScanNode extends FileQueryScanNode {
                     .getMetaStoreCache((HMSExternalCatalog) 
hmsTable.getCatalog());
             String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
             List<Split> allFiles = Lists.newArrayList();
-            getFileSplitByPartitions(cache, prunedPartitions, allFiles, 
bindBrokerName);
+            getFileSplitByPartitions(cache, prunedPartitions, allFiles, 
bindBrokerName, numBackends);
             if (ConnectContext.get().getExecutor() != null) {
                 
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
             }
@@ -193,7 +194,7 @@ public class HiveScanNode extends FileQueryScanNode {
     }
 
     @Override
-    public void startSplit() {
+    public void startSplit(int numBackends) {
         if (prunedPartitions.isEmpty()) {
             splitAssignment.finishSchedule();
             return;
@@ -214,12 +215,12 @@ public class HiveScanNode extends FileQueryScanNode {
                         try {
                             List<Split> allFiles = Lists.newArrayList();
                             getFileSplitByPartitions(
-                                    cache, 
Collections.singletonList(partition), allFiles, bindBrokerName);
+                                    cache, 
Collections.singletonList(partition), allFiles, bindBrokerName, numBackends);
                             if (allFiles.size() > numSplitsPerPartition.get()) 
{
                                 numSplitsPerPartition.set(allFiles.size());
                             }
                             splitAssignment.addToQueue(allFiles);
-                        } catch (IOException e) {
+                        } catch (Exception e) {
                             batchException.set(new 
UserException(e.getMessage(), e));
                         } finally {
                             splittersOnFlight.release();
@@ -263,7 +264,7 @@ public class HiveScanNode extends FileQueryScanNode {
     }
 
     private void getFileSplitByPartitions(HiveMetaStoreCache cache, 
List<HivePartition> partitions,
-                                          List<Split> allFiles, String 
bindBrokerName) throws IOException {
+            List<Split> allFiles, String bindBrokerName, int numBackends) 
throws IOException, UserException {
         List<FileCacheValue> fileCaches;
         if (hiveTransaction != null) {
             fileCaches = getFileSplitByTransaction(cache, partitions, 
bindBrokerName);
@@ -276,11 +277,34 @@ public class HiveScanNode extends FileQueryScanNode {
             splitAllFiles(allFiles, hiveFileStatuses);
             return;
         }
+
+        /**
+         * If the push down aggregation operator is COUNT,
+         * we don't need to split the file because for parquet/orc format, 
only metadata is read.
+         * If we split the file, we will read metadata of a file multiple 
times, which is not efficient.
+         *
+         * - Hive Transactional Table may need merge on read, so do not apply 
this optimization.
+         * - If the file format is not parquet/orc, eg, text, we need to split 
the file to increase the parallelism.
+         */
+        boolean needSplit = true;
+        if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT
+                && hiveTransaction != null) {
+            int totalFileNum = 0;
+            for (FileCacheValue fileCacheValue : fileCaches) {
+                if (fileCacheValue.getFiles() != null) {
+                    totalFileNum += fileCacheValue.getFiles().size();
+                }
+            }
+            int parallelNum = sessionVariable.getParallelExecInstanceNum();
+            needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, 
numBackends, totalFileNum);
+        }
         for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
             if (fileCacheValue.getFiles() != null) {
                 boolean isSplittable = fileCacheValue.isSplittable();
                 for (HiveMetaStoreCache.HiveFileStatus status : 
fileCacheValue.getFiles()) {
-                    allFiles.addAll(splitFile(status.getPath(), 
status.getBlockSize(),
+                    allFiles.addAll(FileSplitter.splitFile(status.getPath(),
+                            // set block size to Long.MAX_VALUE to avoid 
splitting the file.
+                            getRealFileSplitSize(needSplit ? 
status.getBlockSize() : Long.MAX_VALUE),
                             status.getBlockLocations(), status.getLength(), 
status.getModificationTime(),
                             isSplittable, fileCacheValue.getPartitionValues(),
                             new 
HiveSplitCreator(fileCacheValue.getAcidInfo())));
@@ -292,7 +316,7 @@ public class HiveScanNode extends FileQueryScanNode {
     private void splitAllFiles(List<Split> allFiles,
                                List<HiveMetaStoreCache.HiveFileStatus> 
hiveFileStatuses) throws IOException {
         for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) {
-            allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(),
+            allFiles.addAll(FileSplitter.splitFile(status.getPath(), 
getRealFileSplitSize(status.getBlockSize()),
                     status.getBlockLocations(), status.getLength(), 
status.getModificationTime(),
                     status.isSplittable(), status.getPartitionValues(),
                     new HiveSplitCreator(status.getAcidInfo())));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java
index 5dd63e734c9..58bfb32e617 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveSplit.java
@@ -54,9 +54,13 @@ public class HiveSplit extends FileSplit {
 
         @Override
         public Split create(LocationPath path, long start, long length, long 
fileLength,
+                long fileSplitSize,
                 long modificationTime, String[] hosts,
                 List<String> partitionValues) {
-            return new HiveSplit(path, start, length, fileLength, 
modificationTime, hosts, partitionValues, acidInfo);
+            HiveSplit split =  new HiveSplit(path, start, length, fileLength, 
modificationTime,
+                    hosts, partitionValues, acidInfo);
+            split.setTargetSplitSize(fileSplitSize);
+            return split;
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index b2cad8ab710..e1dfaa40aef 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -125,8 +125,8 @@ public class HudiScanNode extends HiveScanNode {
      */
     public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv,
             Optional<TableScanParams> scanParams, 
Optional<IncrementalRelation> incrementalRelation,
-            SessionVariable sessionVariable) {
-        super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, 
needCheckColumnPriv);
+            SessionVariable sv) {
+        super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, 
needCheckColumnPriv, sv);
         isCowTable = hmsTable.isHoodieCowTable();
         if (LOG.isDebugEnabled()) {
             if (isCowTable) {
@@ -390,7 +390,7 @@ public class HudiScanNode extends HiveScanNode {
     }
 
     @Override
-    public List<Split> getSplits() throws UserException {
+    public List<Split> getSplits(int numBackends) throws UserException {
         if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) {
             return getIncrementalSplits();
         }
@@ -406,7 +406,7 @@ public class HudiScanNode extends HiveScanNode {
     }
 
     @Override
-    public void startSplit() {
+    public void startSplit(int numBackends) {
         if (prunedPartitions.isEmpty()) {
             splitAssignment.finishSchedule();
             return;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index f7b58158d1a..c78140b9d3c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -36,7 +36,7 @@ import 
org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TExplainLevel;
@@ -87,7 +87,13 @@ public class IcebergScanNode extends FileQueryScanNode {
     private IcebergSource source;
     private Table icebergTable;
     private List<String> pushdownIcebergPredicates = Lists.newArrayList();
-    private boolean pushDownCount = false;
+    // If tableLevelPushDownCount is true, means we can do count push down opt 
at table level.
+    // which means all splits have no position/equality delete files,
+    // so for query like "select count(*) from ice_tbl", we can get count from 
snapshot row count info directly.
+    // If tableLevelPushDownCount is false, means we can't do count push down 
opt at table level,
+    // But for part of splits which have no position/equality delete files, we 
can still do count push down opt.
+    // And for split level count push down opt, the flag is set in each split.
+    private boolean tableLevelPushDownCount = false;
     private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
 
     /**
@@ -96,8 +102,8 @@ public class IcebergScanNode extends FileQueryScanNode {
      * eg: s3 tvf
      * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
      */
-    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
-        super(id, desc, "ICEBERG_SCAN_NODE", 
StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv);
+    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv) {
+        super(id, desc, "ICEBERG_SCAN_NODE", 
StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv, sv);
 
         ExternalTable table = (ExternalTable) desc.getTable();
         if (table instanceof HMSExternalTable) {
@@ -140,8 +146,8 @@ public class IcebergScanNode extends FileQueryScanNode {
         int formatVersion = icebergSplit.getFormatVersion();
         fileDesc.setFormatVersion(formatVersion);
         fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
-        if (pushDownCount) {
-            fileDesc.setRowCount(icebergSplit.getRowCount());
+        if (tableLevelPushDownCount) {
+            fileDesc.setRowCount(icebergSplit.getTableLevelRowCount());
         }
         if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
             fileDesc.setContent(FileContent.DATA.id());
@@ -177,11 +183,12 @@ public class IcebergScanNode extends FileQueryScanNode {
     }
 
     @Override
-    public List<Split> getSplits() throws UserException {
-        return 
HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(), 
this::doGetSplits);
+    public List<Split> getSplits(int numBackends) throws UserException {
+        return 
HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(),
+                () -> doGetSplits(numBackends));
     }
 
-    private List<Split> doGetSplits() throws UserException {
+    private List<Split> doGetSplits(int numBackends) throws UserException {
         TableScan scan = icebergTable.newScan();
 
         // set snapshot
@@ -209,9 +216,10 @@ public class IcebergScanNode extends FileQueryScanNode {
         HashSet<String> partitionPathSet = new HashSet<>();
         boolean isPartitionedTable = icebergTable.spec().isPartitioned();
 
-        CloseableIterable<FileScanTask> fileScanTasks = 
TableScanUtil.splitFiles(scan.planFiles(), fileSplitSize);
+        long realFileSplitSize = getRealFileSplitSize(0);
+        CloseableIterable<FileScanTask> fileScanTasks = 
TableScanUtil.splitFiles(scan.planFiles(), realFileSplitSize);
         try (CloseableIterable<CombinedScanTask> combinedScanTasks =
-                TableScanUtil.planTasks(fileScanTasks, fileSplitSize, 1, 0)) {
+                TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 1, 
0)) {
             combinedScanTasks.forEach(taskGrp -> 
taskGrp.files().forEach(splitTask -> {
                 List<String> partitionValues = new ArrayList<>();
                 if (isPartitionedTable) {
@@ -250,6 +258,7 @@ public class IcebergScanNode extends FileQueryScanNode {
                         source.getCatalog().getProperties(),
                         partitionValues,
                         originalPath);
+                split.setTargetSplitSize(realFileSplitSize);
                 if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
                     
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
                 }
@@ -268,11 +277,11 @@ public class IcebergScanNode extends FileQueryScanNode {
             }
             long countFromSnapshot = getCountFromSnapshot();
             if (countFromSnapshot >= 0) {
-                pushDownCount = true;
+                tableLevelPushDownCount = true;
                 List<Split> pushDownCountSplits;
                 if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) {
-                    int parallelNum = 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
-                    pushDownCountSplits = splits.subList(0, 
Math.min(splits.size(), parallelNum));
+                    int minSplits = 
sessionVariable.getParallelExecInstanceNum() * numBackends;
+                    pushDownCountSplits = splits.subList(0, 
Math.min(splits.size(), minSplits));
                 } else {
                     pushDownCountSplits = 
Collections.singletonList(splits.get(0));
                 }
@@ -282,7 +291,6 @@ public class IcebergScanNode extends FileQueryScanNode {
         }
 
         selectedPartitionNum = partitionPathSet.size();
-        splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
         return splits;
     }
 
@@ -422,8 +430,8 @@ public class IcebergScanNode extends FileQueryScanNode {
         int size = splits.size();
         long countPerSplit = totalCount / size;
         for (int i = 0; i < size - 1; i++) {
-            ((IcebergSplit) splits.get(i)).setRowCount(countPerSplit);
+            ((IcebergSplit) 
splits.get(i)).setTableLevelRowCount(countPerSplit);
         }
-        ((IcebergSplit) splits.get(size - 1)).setRowCount(countPerSplit + 
totalCount % size);
+        ((IcebergSplit) splits.get(size - 
1)).setTableLevelRowCount(countPerSplit + totalCount % size);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index 580d3cf1bb2..0520612935a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -37,7 +37,8 @@ public class IcebergSplit extends FileSplit {
     private Integer formatVersion;
     private List<IcebergDeleteFileFilter> deleteFileFilters;
     private Map<String, String> config;
-    private long rowCount = -1;
+    // tableLevelRowCount will be set only table-level count push down opt is 
available.
+    private long tableLevelRowCount = -1;
 
     // File path will be changed if the file is modified, so there's no need 
to get modification time.
     public IcebergSplit(LocationPath file, long start, long length, long 
fileLength, String[] hosts,
@@ -50,14 +51,6 @@ public class IcebergSplit extends FileSplit {
         this.selfSplitWeight = length;
     }
 
-    public long getRowCount() {
-        return rowCount;
-    }
-
-    public void setRowCount(long rowCount) {
-        this.rowCount = rowCount;
-    }
-
     public void setDeleteFileFilters(List<IcebergDeleteFileFilter> 
deleteFileFilters) {
         this.deleteFileFilters = deleteFileFilters;
         this.selfSplitWeight += 
deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
index a7311055cff..80f037239ce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.lakesoul.LakeSoulUtils;
 import org.apache.doris.datasource.property.constants.MinioProperties;
 import org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TFileFormatType;
@@ -80,8 +81,8 @@ public class LakeSoulScanNode extends FileQueryScanNode {
 
     String readType;
 
-    public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
-        super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, 
needCheckColumnPriv);
+    public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv) {
+        super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, 
needCheckColumnPriv, sv);
     }
 
     @Override
@@ -209,7 +210,7 @@ public class LakeSoulScanNode extends FileQueryScanNode {
         rangeDesc.setTableFormatParams(tableFormatFileDesc);
     }
 
-    public List<Split> getSplits() throws UserException {
+    public List<Split> getSplits(int numBackends) throws UserException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("getSplits with columnFilters={}", columnFilters);
             LOG.debug("getSplits with columnNameToRange={}", 
columnNameToRange);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
index 9fa22a0fffa..e4bb8b5e9dc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
@@ -44,7 +44,7 @@ import 
org.apache.doris.datasource.property.constants.MCProperties;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
 import org.apache.doris.nereids.util.DateUtils;
 import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TFileFormatType;
@@ -107,21 +107,23 @@ public class MaxComputeScanNode extends FileQueryScanNode 
{
 
     // For new planner
     public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc,
-            SelectedPartitions selectedPartitions, boolean 
needCheckColumnPriv) {
+            SelectedPartitions selectedPartitions, boolean needCheckColumnPriv,
+            SessionVariable sv) {
         this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE,
-                selectedPartitions, needCheckColumnPriv);
+                selectedPartitions, needCheckColumnPriv, sv);
     }
 
     // For old planner
-    public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
+    public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv,
+            SessionVariable sv) {
         this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE,
-                SelectedPartitions.NOT_PRUNED, needCheckColumnPriv);
+                SelectedPartitions.NOT_PRUNED, needCheckColumnPriv, sv);
     }
 
     private MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
             StatisticalType statisticalType, SelectedPartitions 
selectedPartitions,
-            boolean needCheckColumnPriv) {
-        super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+            boolean needCheckColumnPriv, SessionVariable sv) {
+        super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, 
sv);
         table = (MaxComputeExternalTable) desc.getTable();
         this.selectedPartitions = selectedPartitions;
     }
@@ -214,7 +216,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
             return false;
         }
 
-        int numPartitions = 
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
+        int numPartitions = sessionVariable.getNumPartitionsInBatchMode();
         return numPartitions > 0
                 && selectedPartitions != SelectedPartitions.NOT_PRUNED
                 && selectedPartitions.selectedPartitions.size() >= 
numPartitions;
@@ -226,7 +228,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
     }
 
     @Override
-    public void startSplit() {
+    public void startSplit(int numBackends) {
         this.totalPartitionNum = selectedPartitions.totalPartitionNum;
         this.selectedPartitionNum = 
selectedPartitions.selectedPartitions.size();
 
@@ -241,8 +243,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
                 (key, value) -> requiredPartitionSpecs.add(new 
PartitionSpec(key))
         );
 
-
-        int batchNumPartitions = 
ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
+        int batchNumPartitions = sessionVariable.getNumPartitionsInBatchMode();
 
         Executor scheduleExecutor = 
Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
         AtomicReference<UserException> batchException = new 
AtomicReference<>(null);
@@ -546,7 +547,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
         return new HashMap<>();
     }
 
-    List<Split> getSplitByTableSession(TableBatchReadSession 
tableBatchReadSession) throws java.io.IOException {
+    private List<Split> getSplitByTableSession(TableBatchReadSession 
tableBatchReadSession) throws IOException {
         List<Split> result = new ArrayList<>();
         String scanSessionSerialize =  serializeSession(tableBatchReadSession);
         InputSplitAssigner assigner = 
tableBatchReadSession.getInputSplitAssigner();
@@ -595,9 +596,8 @@ public class MaxComputeScanNode extends FileQueryScanNode {
         return result;
     }
 
-
     @Override
-    public List<Split> getSplits() throws UserException {
+    public List<Split> getSplits(int numBackends) throws UserException {
         List<Split> result = new ArrayList<>();
         com.aliyun.odps.Table odpsTable = table.getOdpsTable();
         if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 5009ec3c904..28efbc58f51 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.FileFormatUtils;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.FileQueryScanNode;
+import org.apache.doris.datasource.FileSplitter;
 import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
 import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.planner.PlanNodeId;
@@ -36,6 +37,7 @@ import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.TPaimonDeletionFileDesc;
 import org.apache.doris.thrift.TPaimonFileDesc;
+import org.apache.doris.thrift.TPushAggOp;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
 import com.google.common.base.Preconditions;
@@ -101,15 +103,16 @@ public class PaimonScanNode extends FileQueryScanNode {
     private int rawFileSplitNum = 0;
     private int paimonSplitNum = 0;
     private List<SplitStat> splitStats = new ArrayList<>();
-    private SessionVariable sessionVariable;
     private String serializedTable;
 
+    private boolean pushDownCount = false;
+    private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
+
     public PaimonScanNode(PlanNodeId id,
             TupleDescriptor desc,
             boolean needCheckColumnPriv,
-            SessionVariable sessionVariable) {
-        super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, 
needCheckColumnPriv);
-        this.sessionVariable = sessionVariable;
+            SessionVariable sv) {
+        super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, 
needCheckColumnPriv, sv);
     }
 
     @Override
@@ -199,7 +202,7 @@ public class PaimonScanNode extends FileQueryScanNode {
     }
 
     @Override
-    public List<Split> getSplits() throws UserException {
+    public List<Split> getSplits(int numBackends) throws UserException {
         boolean forceJniScanner = sessionVariable.isForceJniScanner();
         SessionVariable.IgnoreSplitType ignoreSplitType = 
SessionVariable.IgnoreSplitType
                 .valueOf(sessionVariable.getIgnoreSplitType());
@@ -211,6 +214,8 @@ public class PaimonScanNode extends FileQueryScanNode {
         List<org.apache.paimon.table.source.Split> paimonSplits = 
readBuilder.withFilter(predicates)
                 .withProjection(projected)
                 .newScan().plan().splits();
+
+        boolean applyCountPushdown = getPushDownAggNoGroupingOp() == 
TPushAggOp.COUNT;
         // Just for counting the number of selected partitions for this paimon 
table
         Set<BinaryRow> selectedPartitionValues = Sets.newHashSet();
         for (org.apache.paimon.table.source.Split split : paimonSplits) {
@@ -238,9 +243,9 @@ public class PaimonScanNode extends FileQueryScanNode {
                             LocationPath locationPath = new 
LocationPath(file.path(),
                                     source.getCatalog().getProperties());
                             try {
-                                List<Split> dorisSplits = splitFile(
+                                List<Split> dorisSplits = 
FileSplitter.splitFile(
                                         locationPath,
-                                        0,
+                                        getRealFileSplitSize(0),
                                         null,
                                         file.length(),
                                         -1,
@@ -261,25 +266,7 @@ public class PaimonScanNode extends FileQueryScanNode {
                             }
                         }
                     } else {
-                        for (RawFile file : rawFiles) {
-                            LocationPath locationPath = new 
LocationPath(file.path(),
-                                    source.getCatalog().getProperties());
-                            try {
-                                splits.addAll(
-                                        splitFile(
-                                                locationPath,
-                                                0,
-                                                null,
-                                                file.length(),
-                                                -1,
-                                                true,
-                                                null,
-                                                
PaimonSplit.PaimonSplitCreator.DEFAULT));
-                                ++rawFileSplitNum;
-                            } catch (IOException e) {
-                                throw new UserException("Paimon error to split 
file: " + e.getMessage(), e);
-                            }
-                        }
+                        createRawFileSplits(rawFiles, splits, 
applyCountPushdown ? Long.MAX_VALUE : 0);
                     }
                 } else {
                     if (ignoreSplitType == 
SessionVariable.IgnoreSplitType.IGNORE_JNI) {
@@ -297,14 +284,34 @@ public class PaimonScanNode extends FileQueryScanNode {
             }
             splitStats.add(splitStat);
         }
+
         this.selectedPartitionNum = selectedPartitionValues.size();
         // TODO: get total partition number
-        // We should set fileSplitSize at the end because fileSplitSize may be 
modified
-        // in splitFile.
-        splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
         return splits;
     }
 
+    private void createRawFileSplits(List<RawFile> rawFiles, List<Split> 
splits, long blockSize) throws UserException {
+        for (RawFile file : rawFiles) {
+            LocationPath locationPath = new LocationPath(file.path(),
+                    source.getCatalog().getProperties());
+            try {
+                splits.addAll(
+                        FileSplitter.splitFile(
+                                locationPath,
+                                getRealFileSplitSize(blockSize),
+                                null,
+                                file.length(),
+                                -1,
+                                true,
+                                null,
+                                PaimonSplit.PaimonSplitCreator.DEFAULT));
+                ++rawFileSplitNum;
+            } catch (IOException e) {
+                throw new UserException("Paimon error to split file: " + 
e.getMessage(), e);
+            }
+        }
+    }
+
     private String getFileFormat(String path) {
         return 
FileFormatUtils.getFileFormatBySuffix(path).orElse(source.getFileFormatFromTableProperties());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
index 3ab38c7db28..988f043ad0e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
@@ -38,7 +38,6 @@ public class PaimonSplit extends FileSplit {
     private TableFormatType tableFormatType;
     private Optional<DeletionFile> optDeletionFile;
 
-
     public PaimonSplit(Split split) {
         super(DUMMY_PATH, 0, 0, 0, 0, null, null);
         this.split = split;
@@ -100,10 +99,14 @@ public class PaimonSplit extends FileSplit {
                 long start,
                 long length,
                 long fileLength,
+                long fileSplitSize,
                 long modificationTime,
                 String[] hosts,
                 List<String> partitionValues) {
-            return new PaimonSplit(path, start, length, fileLength, 
modificationTime, hosts, partitionValues);
+            PaimonSplit split =  new PaimonSplit(path, start, length, 
fileLength,
+                    modificationTime, hosts, partitionValues);
+            split.setTargetSplitSize(fileSplitSize);
+            return split;
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
index 6f660993d63..50c1d5752a1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.TableFormatType;
 import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TFileAttributes;
@@ -97,8 +98,10 @@ public class TrinoConnectorScanNode extends 
FileQueryScanNode {
     private ConnectorMetadata connectorMetadata;
     private Constraint constraint;
 
-    public TrinoConnectorScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
-        super(id, desc, "TRINO_CONNECTOR_SCAN_NODE", 
StatisticalType.TRINO_CONNECTOR_SCAN_NODE, needCheckColumnPriv);
+    public TrinoConnectorScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv,
+            SessionVariable sv) {
+        super(id, desc, "TRINO_CONNECTOR_SCAN_NODE", 
StatisticalType.TRINO_CONNECTOR_SCAN_NODE, needCheckColumnPriv,
+                sv);
     }
 
     @Override
@@ -129,7 +132,7 @@ public class TrinoConnectorScanNode extends 
FileQueryScanNode {
     }
 
     @Override
-    public List<Split> getSplits() throws UserException {
+    public List<Split> getSplits(int numBackends) throws UserException {
         // 1. Get necessary objects
         Connector connector = source.getConnector();
         connectorMetadata = source.getConnectorMetadata();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index b0f0406c215..5e650930365 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -29,7 +29,9 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.FileSplit;
 import org.apache.doris.datasource.FileSplit.FileSplitCreator;
+import org.apache.doris.datasource.FileSplitter;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
@@ -40,6 +42,7 @@ import org.apache.doris.thrift.TFileAttributes;
 import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TPushAggOp;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -63,8 +66,8 @@ public class TVFScanNode extends FileQueryScanNode {
      * eg: s3 tvf
      * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
      */
-    public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
-        super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE, 
needCheckColumnPriv);
+    public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv) {
+        super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE, 
needCheckColumnPriv, sv);
         table = (FunctionGenTable) this.desc.getTable();
         tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf();
     }
@@ -126,16 +129,27 @@ public class TVFScanNode extends FileQueryScanNode {
     }
 
     @Override
-    public List<Split> getSplits() throws UserException {
+    public List<Split> getSplits(int numBackends) throws UserException {
         List<Split> splits = Lists.newArrayList();
         if (tableValuedFunction.getTFileType() == TFileType.FILE_STREAM) {
             return splits;
         }
+
         List<TBrokerFileStatus> fileStatuses = 
tableValuedFunction.getFileStatuses();
+
+        // Push down count optimization.
+        boolean needSplit = true;
+        if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT) {
+            int parallelNum = sessionVariable.getParallelExecInstanceNum();
+            int totalFileNum = fileStatuses.size();
+            needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, 
numBackends, totalFileNum);
+        }
+
         for (TBrokerFileStatus fileStatus : fileStatuses) {
             Map<String, String> prop = Maps.newHashMap();
             try {
-                splits.addAll(splitFile(new LocationPath(fileStatus.getPath(), 
prop), fileStatus.getBlockSize(),
+                splits.addAll(FileSplitter.splitFile(new 
LocationPath(fileStatus.getPath(), prop),
+                        getRealFileSplitSize(needSplit ? 
fileStatus.getBlockSize() : Long.MAX_VALUE),
                         null, fileStatus.getSize(),
                         fileStatus.getModificationTime(), 
fileStatus.isSplitable, null,
                         FileSplitCreator.DEFAULT));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 7f4d23c6130..28b14398c86 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -196,6 +196,7 @@ import org.apache.doris.planner.SortNode;
 import org.apache.doris.planner.TableFunctionNode;
 import org.apache.doris.planner.UnionNode;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.statistics.StatisticConstants;
 import org.apache.doris.tablefunction.TableValuedFunctionIf;
 import org.apache.doris.thrift.TFetchOption;
@@ -555,15 +556,16 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         ExternalTable table = fileScan.getTable();
         TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, 
context);
 
+        SessionVariable sv = ConnectContext.get().getSessionVariable();
         // TODO(cmy): determine the needCheckColumnPriv param
         ScanNode scanNode;
         if (table instanceof HMSExternalTable) {
             switch (((HMSExternalTable) table).getDlaType()) {
                 case ICEBERG:
-                    scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
+                    scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
                     break;
                 case HIVE:
-                    scanNode = new HiveScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
+                    scanNode = new HiveScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
                     HiveScanNode hiveScanNode = (HiveScanNode) scanNode;
                     
hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
                     if (fileScan.getTableSample().isPresent()) {
@@ -575,17 +577,16 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                     throw new RuntimeException("do not support DLA type " + 
((HMSExternalTable) table).getDlaType());
             }
         } else if (table instanceof IcebergExternalTable) {
-            scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
+            scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
         } else if (table instanceof PaimonExternalTable) {
-            scanNode = new PaimonScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false,
-                ConnectContext.get().getSessionVariable());
+            scanNode = new PaimonScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
         } else if (table instanceof TrinoConnectorExternalTable) {
-            scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
+            scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
         } else if (table instanceof MaxComputeExternalTable) {
             scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), 
tupleDescriptor,
-                    fileScan.getSelectedPartitions(), false);
+                    fileScan.getSelectedPartitions(), false, sv);
         } else if (table instanceof LakeSoulExternalTable) {
-            scanNode = new LakeSoulScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
+            scanNode = new LakeSoulScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
         } else {
             throw new RuntimeException("do not support table type " + 
table.getType());
         }
@@ -911,7 +912,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         TupleDescriptor tupleDescriptor = generateTupleDesc(slots, 
tvfRelation.getFunction().getTable(), context);
 
         TableValuedFunctionIf catalogFunction = 
tvfRelation.getFunction().getCatalogFunction();
-        ScanNode scanNode = 
catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor);
+        SessionVariable sv = ConnectContext.get().getSessionVariable();
+        ScanNode scanNode = 
catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor, sv);
         scanNode.setNereidsId(tvfRelation.getId());
         Utils.execWithUncheckedException(scanNode::init);
         context.getRuntimeTranslator().ifPresent(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index ae5d562bd5b..f29ff4dba32 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -75,6 +75,7 @@ import org.apache.doris.datasource.odbc.source.OdbcScanNode;
 import org.apache.doris.datasource.paimon.source.PaimonScanNode;
 import 
org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.thrift.TPushAggOp;
 
@@ -1916,8 +1917,8 @@ public class SingleNodePlanner {
      */
     private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, 
SelectStmt selectStmt)
             throws UserException {
-        ScanNode scanNode = null;
-
+        SessionVariable sv = ConnectContext.get().getSessionVariable();
+        ScanNode scanNode;
         switch (tblRef.getTable().getType()) {
             case OLAP:
             case MATERIALIZED_VIEW:
@@ -1955,7 +1956,7 @@ public class SingleNodePlanner {
                 scanNode = new JdbcScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), false);
                 break;
             case TABLE_VALUED_FUNCTION:
-                scanNode = ((TableValuedFunctionRef) 
tblRef).getScanNode(ctx.getNextNodeId());
+                scanNode = ((TableValuedFunctionRef) 
tblRef).getScanNode(ctx.getNextNodeId(), sv);
                 break;
             case HMS_EXTERNAL_TABLE:
                 TableIf table = tblRef.getDesc().getTable();
@@ -1968,13 +1969,13 @@ public class SingleNodePlanner {
                                     + "please set enable_nereids_planner = 
true to enable new optimizer");
                         }
                         scanNode = new HudiScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true,
-                                Optional.empty(), Optional.empty(), 
ConnectContext.get().getSessionVariable());
+                                Optional.empty(), Optional.empty(), sv);
                         break;
                     case ICEBERG:
-                        scanNode = new IcebergScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                        scanNode = new IcebergScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv);
                         break;
                     case HIVE:
-                        scanNode = new HiveScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                        scanNode = new HiveScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv);
                         ((HiveScanNode) 
scanNode).setTableSample(tblRef.getTableSample());
                         break;
                     default:
@@ -1982,17 +1983,16 @@ public class SingleNodePlanner {
                 }
                 break;
             case ICEBERG_EXTERNAL_TABLE:
-                scanNode = new IcebergScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                scanNode = new IcebergScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv);
                 break;
             case PAIMON_EXTERNAL_TABLE:
-                scanNode = new PaimonScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true,
-                    ConnectContext.get().getSessionVariable());
+                scanNode = new PaimonScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv);
                 break;
             case TRINO_CONNECTOR_EXTERNAL_TABLE:
-                scanNode = new TrinoConnectorScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                scanNode = new TrinoConnectorScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv);
                 break;
             case MAX_COMPUTE_EXTERNAL_TABLE:
-                scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                scanNode = new MaxComputeScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv);
                 break;
             case ES_EXTERNAL_TABLE:
                 scanNode = new EsScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
@@ -2001,7 +2001,7 @@ public class SingleNodePlanner {
                 scanNode = new JdbcScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
                 break;
             case LAKESOUl_EXTERNAL_TABLE:
-                scanNode = new LakeSoulScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                scanNode = new LakeSoulScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true, sv);
                 break;
             case TEST_EXTERNAL_TABLE:
                 scanNode = new TestExternalTableScanNode(ctx.getNextNodeId(), 
tblRef.getDesc());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
index 629b410e676..66f344f03be 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.planner.DataGenScanNode;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.thrift.TDataGenFunctionName;
 
 import java.util.List;
@@ -32,7 +33,7 @@ public abstract class DataGenTableValuedFunction extends 
TableValuedFunctionIf {
     public abstract TDataGenFunctionName getDataGenFunctionName();
 
     @Override
-    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
         return new DataGenScanNode(id, desc, this);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index e137d5d200c..6f45a1cc0eb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -52,6 +52,7 @@ import org.apache.doris.proto.Types.PStructField;
 import org.apache.doris.proto.Types.PTypeDesc;
 import org.apache.doris.proto.Types.PTypeNode;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.system.Backend;
@@ -301,8 +302,8 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
     }
 
     @Override
-    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
-        return new TVFScanNode(id, desc, false);
+    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
+        return new TVFScanNode(id, desc, false, sv);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
index 3bd262f467d..324e17d4f24 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
@@ -30,6 +30,7 @@ import org.apache.doris.planner.GroupCommitScanNode;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.thrift.TFileType;
 
 import java.util.ArrayList;
@@ -83,7 +84,7 @@ public class GroupCommitTableValuedFunction extends 
ExternalFileTableValuedFunct
     }
 
     @Override
-    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
         return new GroupCommitScanNode(id, desc, tableId);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
index b884dab3882..a9847c7eadd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
@@ -26,6 +26,7 @@ import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
 import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.SessionVariable;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -47,7 +48,7 @@ public class JdbcQueryTableValueFunction extends 
QueryTableValueFunction {
     }
 
     @Override
-    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
         JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf;
         JdbcTable jdbcTable = new JdbcTable(1, desc.getTable().getName(), 
desc.getTable().getFullSchema(),
                 TableType.JDBC);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index a7e25bc7f82..7bd28f363e7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.datasource.tvf.source.MetadataScanNode;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.thrift.TMetaScanRange;
 import org.apache.doris.thrift.TMetadataTableRequestParams;
 import org.apache.doris.thrift.TMetadataType;
@@ -60,7 +61,7 @@ public abstract class MetadataTableValuedFunction extends 
TableValuedFunctionIf
     public abstract TMetaScanRange getMetaScanRange();
 
     @Override
-    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
         return new MetadataScanNode(id, desc, this);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
index cb0f5100229..07a125836b7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/QueryTableValueFunction.java
@@ -28,6 +28,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -87,5 +88,5 @@ public abstract class QueryTableValueFunction extends 
TableValuedFunctionIf {
     public abstract List<Column> getTableColumns() throws AnalysisException;
 
     @Override
-    public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc);
+    public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index d4faa460195..eb323a76672 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 
 import java.util.List;
 import java.util.Map;
@@ -88,7 +89,7 @@ public abstract class TableValuedFunctionIf {
 
     public abstract List<Column> getTableColumns() throws AnalysisException;
 
-    public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc);
+    public abstract ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv);
 
     public void checkAuth(ConnectContext ctx) {
 
diff --git 
a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out 
b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out
index f3b44964915..a394836625d 100644
--- a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out
+++ b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out
@@ -578,6 +578,26 @@ bbb
 
 -- !c109 --
 
+-- !c110 --
+3
+
+-- !c111 --
+3
+
+-- !c112 --
+2
+
+-- !c113 --
+2
+
+-- !c114 --
+3      3_1
+4      4_1
+
+-- !c115 --
+3      3_1
+4      4_1
+
 -- !all --
 1      2       3       4       5       6       7       8       9.1     10.1    
11.10   2020-02-02      13str   14varchar       a       true    aaaa    
2023-08-13T09:32:38.530
 10     20      30      40      50      60      70      80      90.1    100.1   
110.10  2020-03-02      130str  140varchar      b       false   bbbb    
2023-08-14T08:32:52.821
@@ -1157,6 +1177,26 @@ bbb
 
 -- !c109 --
 
+-- !c110 --
+3
+
+-- !c111 --
+3
+
+-- !c112 --
+2
+
+-- !c113 --
+2
+
+-- !c114 --
+3      3_1
+4      4_1
+
+-- !c115 --
+3      3_1
+4      4_1
+
 -- !all --
 1      2       3       4       5       6       7       8       9.1     10.1    
11.10   2020-02-02      13str   14varchar       a       true    aaaa    
2023-08-13T09:32:38.530
 10     20      30      40      50      60      70      80      90.1    100.1   
110.10  2020-03-02      130str  140varchar      b       false   bbbb    
2023-08-14T08:32:52.821
@@ -1736,6 +1776,26 @@ bbb
 
 -- !c109 --
 
+-- !c110 --
+3
+
+-- !c111 --
+3
+
+-- !c112 --
+2
+
+-- !c113 --
+2
+
+-- !c114 --
+3      3_1
+4      4_1
+
+-- !c115 --
+3      3_1
+4      4_1
+
 -- !all --
 1      2       3       4       5       6       7       8       9.1     10.1    
11.10   2020-02-02      13str   14varchar       a       true    aaaa    
2023-08-13T09:32:38.530
 10     20      30      40      50      60      70      80      90.1    100.1   
110.10  2020-03-02      130str  140varchar      b       false   bbbb    
2023-08-14T08:32:52.821
@@ -2315,3 +2375,23 @@ bbb
 
 -- !c109 --
 
+-- !c110 --
+3
+
+-- !c111 --
+3
+
+-- !c112 --
+2
+
+-- !c113 --
+2
+
+-- !c114 --
+3      3_1
+4      4_1
+
+-- !c115 --
+3      3_1
+4      4_1
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
index 306af3b2cb2..7a9e90a61fe 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
@@ -110,7 +110,7 @@ suite("test_iceberg_optimize_count", 
"p0,external,doris,external_docker,external
 
     } finally {
         sql """ set enable_count_push_down_for_external_table=true; """
-        sql """drop catalog if exists ${catalog_name}"""
+        // sql """drop catalog if exists ${catalog_name}"""
     }
 }
 
diff --git 
a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy 
b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
index 41afb02e0f9..9668cbb0950 100644
--- a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
+++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
@@ -181,6 +181,13 @@ suite("test_paimon_catalog", 
"p0,external,doris,external_docker,external_docker_
             def c108= """ select id from tb_with_upper_case where id = 1 """
             def c109= """ select id from tb_with_upper_case where id < 1 """
 
+            def c110 = """select count(*) from deletion_vector_orc;"""
+            def c111 = """select count(*) from deletion_vector_parquet;"""
+            def c112 = """select count(*) from deletion_vector_orc where id > 
2;"""
+            def c113 = """select count(*) from deletion_vector_parquet where 
id > 2;"""
+            def c114 = """select * from deletion_vector_orc where id > 2;"""
+            def c115 = """select * from deletion_vector_parquet where id > 
2;"""
+
             String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
             String catalog_name = "ctl_test_paimon_catalog"
             String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
@@ -289,6 +296,13 @@ suite("test_paimon_catalog", 
"p0,external,doris,external_docker,external_docker_
                 qt_c107 c107
                 qt_c108 c108
                 qt_c109 c109
+
+                qt_c110 c110
+                qt_c111 c111
+                qt_c112 c112
+                qt_c113 c113
+                qt_c114 c114
+                qt_c115 c115
             }
 
             test_cases("false", "false")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to