This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a8418ccd2c4 [fix](iceberg)The manifest file needs to read the "exists" 
type.  (#50434)
a8418ccd2c4 is described below

commit a8418ccd2c4fc0c9eeb571d5d394c896988c907b
Author: wuwenchi <[email protected]>
AuthorDate: Tue Apr 29 16:21:34 2025 +0800

    [fix](iceberg)The manifest file needs to read the "exists" type.  (#50434)
    
    ### What problem does this PR solve?
    
    related pr: #49489
    
    Problem Summary:
    
    The manifest file needs to read the "exists" type.
---
 .../datasource/iceberg/source/IcebergScanNode.java |  16 +-
 .../iceberg/source/IcebergScanNodeTest.java        | 181 +++++++++++++++++++++
 2 files changed, 194 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index a634aa6c91f..8707f4b06b5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -48,6 +48,7 @@ import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPushAggOp;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -102,6 +103,12 @@ public class IcebergScanNode extends FileQueryScanNode {
     private PreExecutionAuthenticator preExecutionAuthenticator;
     private TableScan icebergTableScan;
 
+    // for test
+    @VisibleForTesting
+    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
+        super(id, desc, "ICEBERG_SCAN_NODE", 
StatisticalType.ICEBERG_SCAN_NODE, false, sv);
+    }
+
     /**
      * External file scan node for Query iceberg table
      * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check 
column priv
@@ -249,7 +256,8 @@ public class IcebergScanNode extends FileQueryScanNode {
         });
     }
 
-    private TableScan createTableScan() {
+    @VisibleForTesting
+    public TableScan createTableScan() {
         if (icebergTableScan != null) {
             return icebergTableScan;
         }
@@ -368,7 +376,8 @@ public class IcebergScanNode extends FileQueryScanNode {
                                 createTableScan().filter()).iterator()) {
                     int cnt = 0;
                     while (matchingManifest.hasNext()) {
-                        cnt += matchingManifest.next().addedFilesCount();
+                        ManifestFile next = matchingManifest.next();
+                        cnt += next.addedFilesCount() + 
next.existingFilesCount();
                         if (cnt >= sessionVariable.getNumFilesInBatchMode()) {
                             return true;
                         }
@@ -470,7 +479,8 @@ public class IcebergScanNode extends FileQueryScanNode {
         return !col.isAllowNull();
     }
 
-    private long getCountFromSnapshot() {
+    @VisibleForTesting
+    public long getCountFromSnapshot() {
         Long specifiedSnapshot = getSpecifiedSnapshot();
 
         Snapshot snapshot = specifiedSnapshot == null
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
new file mode 100644
index 00000000000..8ae51a61f46
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TPushAggOp;
+
+import com.google.common.collect.Lists;
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopTableOperations;
+import org.apache.iceberg.io.CloseableIterable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class IcebergScanNodeTest {
+
+    @Mocked
+    HadoopTableOperations hadoopTableOperations;
+    @Mocked
+    Snapshot snapshot;
+
+    @Test
+    public void testIsBatchMode() {
+        SessionVariable sessionVariable = new SessionVariable();
+        IcebergScanNode icebergScanNode = new IcebergScanNode(new 
PlanNodeId(1), new TupleDescriptor(new TupleId(1)), sessionVariable);
+
+        new Expectations(icebergScanNode) {{
+                icebergScanNode.getPushDownAggNoGroupingOp();
+                result = TPushAggOp.COUNT;
+                icebergScanNode.getCountFromSnapshot();
+                result = 1L;
+            }};
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        BaseTable mockTable = new BaseTable(hadoopTableOperations, 
"mockTable");
+        new Expectations(icebergScanNode) {{
+                icebergScanNode.getPushDownAggNoGroupingOp();
+                result = TPushAggOp.NONE;
+                Deencapsulation.setField(icebergScanNode, "icebergTable", 
mockTable);
+            }};
+        TableScan tableScan = mockTable.newScan();
+        new Expectations(mockTable) {{
+                mockTable.currentSnapshot();
+                result = null;
+                icebergScanNode.createTableScan();
+                result = tableScan;
+            }};
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        new Expectations(mockTable) {{
+                mockTable.currentSnapshot();
+                result = snapshot;
+            }};
+        new Expectations(sessionVariable) {{
+                sessionVariable.getEnableExternalTableBatchMode();
+                result = false;
+            }};
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+
+        new Expectations(sessionVariable) {{
+                sessionVariable.getEnableExternalTableBatchMode();
+                result = true;
+            }};
+        new Expectations(icebergScanNode) {{
+                Deencapsulation.setField(icebergScanNode, 
"preExecutionAuthenticator", new PreExecutionAuthenticator());
+            }};
+        new Expectations() {{
+                sessionVariable.getNumFilesInBatchMode();
+                result = 1024;
+            }};
+
+        mockManifestFile("p", 10, 0);
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 0, 10);
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 10, 10);
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 1024, 0);
+        Assert.assertTrue(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 0, 1024);
+        Assert.assertTrue(icebergScanNode.isBatchMode());
+
+        new Expectations() {{
+                sessionVariable.getNumFilesInBatchMode();
+                result = 100;
+            }};
+
+        mockManifestFile("p", 10, 0);
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 0, 10);
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 10, 10);
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 0, 100);
+        Assert.assertTrue(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 100, 0);
+        Assert.assertTrue(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 10, 90);
+        Assert.assertTrue(icebergScanNode.isBatchMode());
+    }
+
+    private void mockManifestFile(String path, int addedFileCount, int 
existingFileCount) {
+        new MockUp<IcebergUtils>() {
+            @Mock
+            CloseableIterable<ManifestFile> 
getMatchingManifest(List<ManifestFile> dataManifests,
+                                                                Map<Integer, 
PartitionSpec> specsById,
+                                                                Expression 
dataFilte) {
+                return CloseableIterable.withNoopClose(new 
ArrayList<ManifestFile>() {{
+                        add(genManifestFile(path, addedFileCount, 
existingFileCount));
+                    }}
+                );
+            }
+        };
+    }
+
+    private ManifestFile genManifestFile(String path, int addedFileCount, int 
existingFileCount) {
+        return new GenericManifestFile(
+            path,
+            10, // length
+            1, // specId
+            ManifestContent.DATA,
+            1, // sequenceNumber
+            1, // minSeqNumber
+            1L, // snapshotid
+            addedFileCount,
+            1,
+            existingFileCount,
+            1,
+            0, // deleteFilesCount
+            0,
+            Lists.newArrayList(),
+            null
+        );
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to