This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit c1275261a2c37c69b0236a4cc151a0b97b98452d Author: jerry <[email protected]> AuthorDate: Thu Mar 26 11:02:27 2026 +0800 [core] Respect rowsPerShard as hard constraint in global index build (#7530) --- .../shortcodes/generated/core_configuration.html | 2 +- .../main/java/org/apache/paimon/CoreOptions.java | 8 +-- .../globalindex/GlobalIndexBuilderUtils.java | 17 ----- .../globalindex/GlobalIndexBuilderUtilsTest.java | 81 ---------------------- .../flink/globalindex/GenericIndexTopoBuilder.java | 7 -- .../globalindex/DefaultGlobalIndexTopoBuilder.java | 9 --- 6 files changed, 5 insertions(+), 119 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 220ca4cdc4..6dd621fcac 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -666,7 +666,7 @@ under the License. <td><h5>global-index.build.max-shard</h5></td> <td style="word-wrap: break-word;">32</td> <td>Integer</td> - <td>The max number of shards for building global index. If the number of shards calculated by 'global-index.row-count-per-shard' exceeds this value, 'global-index.row-count-per-shard' will be recalculated as ceil(total-row-count / max-shard) to guarantee the shard count does not exceed max-shard.</td> + <td>The preferred max number of shards for building global index. If the number of shards calculated by 'global-index.row-count-per-shard' exceeds this value, max-shard will be automatically increased to accommodate the data volume while keeping 'global-index.row-count-per-shard' unchanged.</td> </tr> <tr> <td><h5>global-index.column-update-action</h5></td> diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 4dc495c12b..7c054b8c8a 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2322,11 +2322,11 @@ public class CoreOptions implements Serializable { .intType() .defaultValue(32) .withDescription( - "The max number of shards for building global index. " + "The preferred max number of shards for building global index. " + "If the number of shards calculated by 'global-index.row-count-per-shard' " - + "exceeds this value, 'global-index.row-count-per-shard' will be " - + "recalculated as ceil(total-row-count / max-shard) to guarantee " - + "the shard count does not exceed max-shard."); + + "exceeds this value, max-shard will be automatically increased " + + "to accommodate the data volume while keeping " + + "'global-index.row-count-per-shard' unchanged."); public static final ConfigOption<Integer> GLOBAL_INDEX_BUILD_MAX_PARALLELISM = key("global-index.build.max-parallelism") diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java index 19ffc3bb40..085423efa8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java @@ -71,23 +71,6 @@ public class GlobalIndexBuilderUtils { return results; } - /** - * Adjust rowsPerShard if the estimated shard count exceeds maxShard. Returns the adjusted - * rowsPerShard value. - */ - public static long adjustRowsPerShard(long rowsPerShard, long totalRowCount, int maxShard) { - long estimatedShards = ceilDivision(totalRowCount, rowsPerShard); - if (estimatedShards > maxShard) { - return ceilDivision(totalRowCount, maxShard); - } - return rowsPerShard; - } - - /** Integer ceiling division: returns ceil(a / b) for positive a and b. */ - private static long ceilDivision(long a, long b) { - return (a + b - 1) / b; - } - public static GlobalIndexWriter createIndexWriter( FileStoreTable table, String indexType, DataField indexField, Options options) throws IOException { diff --git a/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java deleted file mode 100644 index 612f41e0e9..0000000000 --- a/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.globalindex; - -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Test for {@link GlobalIndexBuilderUtils#adjustRowsPerShard}. */ -public class GlobalIndexBuilderUtilsTest { - - @Test - void testAdjustRowsPerShardNoAdjustmentNeeded() { - // 1000 rows, 100 per shard = 10 shards, maxShard = 20 -> no adjustment - long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 20); - assertThat(result).isEqualTo(100); - } - - @Test - void testAdjustRowsPerShardExactMatch() { - // 1000 rows, 100 per shard = 10 shards, maxShard = 10 -> no adjustment - long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 10); - assertThat(result).isEqualTo(100); - } - - @Test - void testAdjustRowsPerShardExceedsMaxShard() { - // 1000 rows, 100 per shard = 10 shards, maxShard = 3 -> adjust to ceil(1000/3) = 334 - long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 3); - assertThat(result).isEqualTo(334); - // Verify: ceil(1000/334) = 3 shards - assertThat((1000 + result - 1) / result).isEqualTo(3); - } - - @Test - void testAdjustRowsPerShardMaxShardOne() { - // 1000 rows, 100 per shard = 10 shards, maxShard = 1 -> all in one shard - long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 1); - assertThat(result).isEqualTo(1000); - assertThat((1000 + result - 1) / result).isEqualTo(1); - } - - @Test - void testAdjustRowsPerShardEvenDivision() { - // 1000 rows, 100 per shard = 10 shards, maxShard = 5 -> adjust to 200 - long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 1000, 5); - assertThat(result).isEqualTo(200); - assertThat((1000 + result - 1) / result).isEqualTo(5); - } - - @Test - void testAdjustRowsPerShardLargeRowCount() { - // 10M rows, 100K per shard = 100 shards, maxShard = 10 -> adjust to 1M - long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100000, 10000000, 10); - assertThat(result).isEqualTo(1000000); - assertThat((10000000 + result - 1) / result).isEqualTo(10); - } - - @Test - void testAdjustRowsPerShardTotalRowsLessThanRowsPerShard() { - // 50 rows, 100 per shard = 1 shard, maxShard = 3 -> no adjustment - long result = GlobalIndexBuilderUtils.adjustRowsPerShard(100, 50, 3); - assertThat(result).isEqualTo(100); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java index 44239b3876..865280763c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java @@ -28,7 +28,6 @@ import org.apache.paimon.flink.sink.NoopCommittableStateManager; import org.apache.paimon.flink.sink.StoreCommitter; import org.apache.paimon.flink.utils.BoundedOneInputOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; -import org.apache.paimon.globalindex.GlobalIndexBuilderUtils; import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; import org.apache.paimon.globalindex.ResultEntry; import org.apache.paimon.index.IndexFileMeta; @@ -159,12 +158,6 @@ public class GenericIndexTopoBuilder { rowsPerShard > 0, "Option 'global-index.row-count-per-shard' must be greater than 0."); - int maxShard = mergedOptions.get(CoreOptions.GLOBAL_INDEX_BUILD_MAX_SHARD); - checkArgument( - maxShard > 0, "Option 'global-index.build.max-shard' must be greater than 0."); - rowsPerShard = - GlobalIndexBuilderUtils.adjustRowsPerShard(rowsPerShard, totalRowCount, maxShard); - // Compute shard tasks at file level from the provided entries List<ShardTask> shardTasks = computeShardTasks(table, entries, rowsPerShard); if (shardTasks.isEmpty()) { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java index 04593c55a3..afd954c39a 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java @@ -21,7 +21,6 @@ package org.apache.paimon.spark.globalindex; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.Path; -import org.apache.paimon.globalindex.GlobalIndexBuilderUtils; import org.apache.paimon.globalindex.IndexedSplit; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestEntry; @@ -56,7 +55,6 @@ import java.util.Map; import java.util.function.BiFunction; import java.util.stream.Collectors; -import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_BUILD_MAX_SHARD; import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_ROW_COUNT_PER_SHARD; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -88,15 +86,8 @@ public class DefaultGlobalIndexTopoBuilder implements GlobalIndexTopologyBuilder rowsPerShard > 0, "Option 'global-index.row-count-per-shard' must be greater than 0."); - int maxShard = tableOptions.get(GLOBAL_INDEX_BUILD_MAX_SHARD); - checkArgument( - maxShard > 0, "Option 'global-index.build.max-shard' must be greater than 0."); List<ManifestEntry> entries = table.store().newScan().withPartitionFilter(partitionPredicate).plan().files(); - long totalRowCount = entries.stream().mapToLong(e -> e.file().rowCount()).sum(); - rowsPerShard = - GlobalIndexBuilderUtils.adjustRowsPerShard(rowsPerShard, totalRowCount, maxShard); - // generate splits for each partition && shard Map<BinaryRow, List<IndexedSplit>> splits = split(table, entries, rowsPerShard);
