This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit c1275261a2c37c69b0236a4cc151a0b97b98452d
Author: jerry <[email protected]>
AuthorDate: Thu Mar 26 11:02:27 2026 +0800

    [core] Respect rowsPerShard as hard constraint in global index build (#7530)
---
 .../shortcodes/generated/core_configuration.html   |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  8 +--
 .../globalindex/GlobalIndexBuilderUtils.java       | 17 -----
 .../globalindex/GlobalIndexBuilderUtilsTest.java   | 81 ----------------------
 .../flink/globalindex/GenericIndexTopoBuilder.java |  7 --
 .../globalindex/DefaultGlobalIndexTopoBuilder.java |  9 ---
 6 files changed, 5 insertions(+), 119 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 220ca4cdc4..6dd621fcac 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -666,7 +666,7 @@ under the License.
             <td><h5>global-index.build.max-shard</h5></td>
             <td style="word-wrap: break-word;">32</td>
             <td>Integer</td>
-            <td>The max number of shards for building global index. If the 
number of shards calculated by 'global-index.row-count-per-shard' exceeds this 
value, 'global-index.row-count-per-shard' will be recalculated as 
ceil(total-row-count / max-shard) to guarantee the shard count does not exceed 
max-shard.</td>
+            <td>The preferred max number of shards for building global index. 
If the number of shards calculated by 'global-index.row-count-per-shard' 
exceeds this value, max-shard will be automatically increased to accommodate 
the data volume while keeping 'global-index.row-count-per-shard' unchanged.</td>
         </tr>
         <tr>
             <td><h5>global-index.column-update-action</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 4dc495c12b..7c054b8c8a 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2322,11 +2322,11 @@ public class CoreOptions implements Serializable {
                     .intType()
                     .defaultValue(32)
                     .withDescription(
-                            "The max number of shards for building global 
index. "
+                            "The preferred max number of shards for building 
global index. "
                                     + "If the number of shards calculated by 
'global-index.row-count-per-shard' "
-                                    + "exceeds this value, 
'global-index.row-count-per-shard' will be "
-                                    + "recalculated as ceil(total-row-count / 
max-shard) to guarantee "
-                                    + "the shard count does not exceed 
max-shard.");
+                                    + "exceeds this value, max-shard will be 
automatically increased "
+                                    + "to accommodate the data volume while 
keeping "
+                                    + "'global-index.row-count-per-shard' 
unchanged.");
 
     public static final ConfigOption<Integer> 
GLOBAL_INDEX_BUILD_MAX_PARALLELISM =
             key("global-index.build.max-parallelism")
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
index 19ffc3bb40..085423efa8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
@@ -71,23 +71,6 @@ public class GlobalIndexBuilderUtils {
         return results;
     }
 
-    /**
-     * Adjust rowsPerShard if the estimated shard count exceeds maxShard. 
Returns the adjusted
-     * rowsPerShard value.
-     */
-    public static long adjustRowsPerShard(long rowsPerShard, long 
totalRowCount, int maxShard) {
-        long estimatedShards = ceilDivision(totalRowCount, rowsPerShard);
-        if (estimatedShards > maxShard) {
-            return ceilDivision(totalRowCount, maxShard);
-        }
-        return rowsPerShard;
-    }
-
-    /** Integer ceiling division: returns ceil(a / b) for positive a and b. */
-    private static long ceilDivision(long a, long b) {
-        return (a + b - 1) / b;
-    }
-
     public static GlobalIndexWriter createIndexWriter(
             FileStoreTable table, String indexType, DataField indexField, 
Options options)
             throws IOException {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java
deleted file mode 100644
index 612f41e0e9..0000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.globalindex;
-
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link GlobalIndexBuilderUtils#adjustRowsPerShard}. */
-public class GlobalIndexBuilderUtilsTest {
-
-    @Test
-    void testAdjustRowsPerShardNoAdjustmentNeeded() {
-        // 1000 rows, 100 per shard = 10 shards, maxShard = 20 -> no adjustment
-        long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 
20);
-        assertThat(result).isEqualTo(100);
-    }
-
-    @Test
-    void testAdjustRowsPerShardExactMatch() {
-        // 1000 rows, 100 per shard = 10 shards, maxShard = 10 -> no adjustment
-        long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 
10);
-        assertThat(result).isEqualTo(100);
-    }
-
-    @Test
-    void testAdjustRowsPerShardExceedsMaxShard() {
-        // 1000 rows, 100 per shard = 10 shards, maxShard = 3 -> adjust to 
ceil(1000/3) = 334
-        long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 3);
-        assertThat(result).isEqualTo(334);
-        // Verify: ceil(1000/334) = 3 shards
-        assertThat((1000 + result - 1) / result).isEqualTo(3);
-    }
-
-    @Test
-    void testAdjustRowsPerShardMaxShardOne() {
-        // 1000 rows, 100 per shard = 10 shards, maxShard = 1 -> all in one 
shard
-        long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 1);
-        assertThat(result).isEqualTo(1000);
-        assertThat((1000 + result - 1) / result).isEqualTo(1);
-    }
-
-    @Test
-    void testAdjustRowsPerShardEvenDivision() {
-        // 1000 rows, 100 per shard = 10 shards, maxShard = 5 -> adjust to 200
-        long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 5);
-        assertThat(result).isEqualTo(200);
-        assertThat((1000 + result - 1) / result).isEqualTo(5);
-    }
-
-    @Test
-    void testAdjustRowsPerShardLargeRowCount() {
-        // 10M rows, 100K per shard = 100 shards, maxShard = 10 -> adjust to 1M
-        long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100000, 
10000000, 10);
-        assertThat(result).isEqualTo(1000000);
-        assertThat((10000000 + result - 1) / result).isEqualTo(10);
-    }
-
-    @Test
-    void testAdjustRowsPerShardTotalRowsLessThanRowsPerShard() {
-        // 50 rows, 100 per shard = 1 shard, maxShard = 3 -> no adjustment
-        long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 50, 3);
-        assertThat(result).isEqualTo(100);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
index 44239b3876..865280763c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
@@ -28,7 +28,6 @@ import 
org.apache.paimon.flink.sink.NoopCommittableStateManager;
 import org.apache.paimon.flink.sink.StoreCommitter;
 import org.apache.paimon.flink.utils.BoundedOneInputOperator;
 import org.apache.paimon.flink.utils.JavaTypeInfo;
-import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
 import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
 import org.apache.paimon.globalindex.ResultEntry;
 import org.apache.paimon.index.IndexFileMeta;
@@ -159,12 +158,6 @@ public class GenericIndexTopoBuilder {
                 rowsPerShard > 0,
                 "Option 'global-index.row-count-per-shard' must be greater 
than 0.");
 
-        int maxShard = 
mergedOptions.get(CoreOptions.GLOBAL_INDEX_BUILD_MAX_SHARD);
-        checkArgument(
-                maxShard > 0, "Option 'global-index.build.max-shard' must be 
greater than 0.");
-        rowsPerShard =
-                GlobalIndexBuilderUtils.adjustRowsPerShard(rowsPerShard, 
totalRowCount, maxShard);
-
         // Compute shard tasks at file level from the provided entries
         List<ShardTask> shardTasks = computeShardTasks(table, entries, 
rowsPerShard);
         if (shardTasks.isEmpty()) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java
index 04593c55a3..afd954c39a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java
@@ -21,7 +21,6 @@ package org.apache.paimon.spark.globalindex;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexBuilderUtils;
 import org.apache.paimon.globalindex.IndexedSplit;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
@@ -56,7 +55,6 @@ import java.util.Map;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_BUILD_MAX_SHARD;
 import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_ROW_COUNT_PER_SHARD;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -88,15 +86,8 @@ public class DefaultGlobalIndexTopoBuilder implements 
GlobalIndexTopologyBuilder
                 rowsPerShard > 0,
                 "Option 'global-index.row-count-per-shard' must be greater 
than 0.");
 
-        int maxShard = tableOptions.get(GLOBAL_INDEX_BUILD_MAX_SHARD);
-        checkArgument(
-                maxShard > 0, "Option 'global-index.build.max-shard' must be 
greater than 0.");
         List<ManifestEntry> entries =
                 
table.store().newScan().withPartitionFilter(partitionPredicate).plan().files();
-        long totalRowCount = entries.stream().mapToLong(e -> 
e.file().rowCount()).sum();
-        rowsPerShard =
-                GlobalIndexBuilderUtils.adjustRowsPerShard(rowsPerShard, 
totalRowCount, maxShard);
-
         // generate splits for each partition && shard
         Map<BinaryRow, List<IndexedSplit>> splits = split(table, entries, 
rowsPerShard);
 

Reply via email to