This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 974b725f63 [core] Introduce BucketSelector based on partition values
to achieve bucket level predicate push down (#7486)
974b725f63 is described below
commit 974b725f636fc6bb31c0efb72b2ae3ad83d174aa
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 20 13:32:15 2026 +0800
[core] Introduce BucketSelector based on partition values to achieve bucket
level predicate push down (#7486)
Introducing BucketSelector based on partition values to achieve bucket
level predicate push down optimization.
Case 1: bucket filtering with compound predicates on a single-field
bucket key.
Table schema:
- Partition key: column 'a' (INT)
- Bucket key: column 'b' (INT)
- Bucket count: 10
Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20) =
100 rows.
Scenarios:
- Predicate: (a < 3 AND b = 5) OR (a = 3 AND b = 7) - Tests partition
range filter with bucket equality, combined with OR. Expected: buckets
for partition 1,2 with b=5 and partition 3 with b=7.
- Predicate: (a < 3 AND b = 5) OR (a = 3 AND b < 100) - Tests partition
range with bucket equality, OR partition equality with bucket range.
Expected: mixed buckets from partition 3 and specific buckets from
partitions 1,2.
- Predicate: (a = 2 AND b = 5) OR (a = 3 AND b = 7) - Tests partition
equality with bucket equality in both OR branches. Expected: exact
bucket matching for each partition-b combination.
Case2: bucket filtering with compound predicates on a composite
(multi-field) bucket key.
Table schema:
- Partition key: column 'a' (INT)
- Bucket key: columns 'b' and 'c' (composite, INT)
- Bucket count: 10
Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20) ×
10 c-values (c=0 to 9) = 1000 rows.
Test scenarios:
- Predicate: ((a < 3 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 - Tests
nested OR within AND, with partition range, bucket field equality, and
additional bucket field filter. The 'c = 5' condition is part of the
composite bucket key, affecting bucket selection.
- Predicate: ((a < 3 AND b = 5) OR (a = 3 AND b < 100)) AND c = 5 -
Tests range predicate on one bucket field (b) combined with equality on
another (c). Validates handling of multiple bucket key fields with
different predicate types.
- Predicate: ((a = 2 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 - Tests
exact matching on both partition and bucket fields. The composite bucket
key (b,c) ensures precise bucket targeting.
---
.../predicate/PartitionValuePredicateVisitor.java | 91 +++++
.../paimon/predicate/PredicateReplaceVisitor.java | 6 +-
.../TriFilter.java} | 29 +-
.../PartitionValuePredicateVisitorTest.java | 375 +++++++++++++++++++++
.../org/apache/paimon/AppendOnlyFileStore.java | 33 +-
.../java/org/apache/paimon/KeyValueFileStore.java | 33 +-
.../org/apache/paimon/manifest/BucketFilter.java | 14 +-
.../apache/paimon/manifest/ManifestEntryCache.java | 12 +-
.../paimon/operation/AbstractFileStoreScan.java | 18 +-
.../paimon/operation/AppendOnlyFileStoreScan.java | 5 +
.../paimon/operation/BucketSelectConverter.java | 171 ++--------
...ketSelectConverter.java => BucketSelector.java} | 92 +++--
.../org/apache/paimon/operation/FileStoreScan.java | 5 +-
.../paimon/operation/KeyValueFileStoreScan.java | 5 +
.../paimon/table/PrimaryKeyFileStoreTable.java | 2 -
.../table/source/snapshot/SnapshotReaderImpl.java | 1 +
.../test/java/org/apache/paimon/TestFileStore.java | 1 -
.../apache/paimon/manifest/BucketFilterTest.java | 163 ---------
.../operation/BucketSelectConverterTest.java | 148 --------
.../paimon/operation/BucketSelectorTest.java | 258 ++++++++++++++
.../apache/paimon/table/BucketFilterScanTest.java | 250 ++++++++++++++
21 files changed, 1149 insertions(+), 563 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java
new file mode 100644
index 0000000000..943b751870
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * A {@link PredicateReplaceVisitor} that evaluates partition predicates
against a known partition
+ * value, replacing them with {@link AlwaysTrue} or {@link AlwaysFalse}.
+ *
+ * <p>For leaf predicates that only reference partition fields, the predicate
is evaluated against
+ * the given partition row (with field indices remapped from table schema to
partition schema). If
+ * the evaluation returns true, the predicate is replaced with AlwaysTrue;
otherwise with
+ * AlwaysFalse.
+ *
+ * <p>For leaf predicates that reference any non-partition field, the
predicate is kept as-is.
+ *
+ * <p>For compound predicates (AND/OR), children are recursively visited and
the result is
+ * simplified via {@link PredicateBuilder#and} / {@link PredicateBuilder#or}.
+ */
+public class PartitionValuePredicateVisitor implements PredicateReplaceVisitor
{
+
+ private final Set<String> partitionFields;
+
+ /** Mapping from table field index to partition field index. -1 if not a
partition field. */
+ private final int[] tableToPartitionMapping;
+
+ private final InternalRow partitionRow;
+
+ public PartitionValuePredicateVisitor(
+ RowType tableType, RowType partitionType, InternalRow
partitionRow) {
+ this.partitionRow = partitionRow;
+ this.partitionFields = new HashSet<>(partitionType.getFieldNames());
+
+ List<String> tableFieldNames = tableType.getFieldNames();
+ List<String> partitionFieldNames = partitionType.getFieldNames();
+
+ this.tableToPartitionMapping = new int[tableFieldNames.size()];
+ for (int i = 0; i < tableFieldNames.size(); i++) {
+ tableToPartitionMapping[i] =
partitionFieldNames.indexOf(tableFieldNames.get(i));
+ }
+ }
+
+ @Override
+ public Optional<Predicate> visit(LeafPredicate predicate) {
+ Set<String> refFields = PredicateVisitor.collectFieldNames(predicate);
+ if (!partitionFields.containsAll(refFields)) {
+ return Optional.of(predicate);
+ }
+
+ // Remap field indices from table schema to partition schema
+ List<Object> remappedInputs = new ArrayList<>();
+ for (Object input : predicate.transform().inputs()) {
+ if (input instanceof FieldRef) {
+ FieldRef ref = (FieldRef) input;
+ int partIdx = tableToPartitionMapping[ref.index()];
+ remappedInputs.add(new FieldRef(partIdx, ref.name(),
ref.type()));
+ } else {
+ remappedInputs.add(input);
+ }
+ }
+
+ // Evaluate the remapped predicate against the known partition row
+ LeafPredicate remapped = predicate.copyWithNewInputs(remappedInputs);
+ boolean result = remapped.test(partitionRow);
+ return Optional.of(result ? PredicateBuilder.alwaysTrue() :
PredicateBuilder.alwaysFalse());
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
index aeaa5d5aee..b71941c0cc 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
@@ -37,6 +37,10 @@ public interface PredicateReplaceVisitor extends
PredicateVisitor<Optional<Predi
return Optional.empty();
}
}
- return Optional.of(new CompoundPredicate(predicate.function(),
converted));
+ if (predicate.function() instanceof And) {
+ return Optional.of(PredicateBuilder.and(converted));
+ } else {
+ return Optional.of(PredicateBuilder.or(converted));
+ }
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
b/paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java
similarity index 51%
copy from
paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
copy to paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java
index aeaa5d5aee..8a5de1e631 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java
@@ -16,27 +16,18 @@
* limitations under the License.
*/
-package org.apache.paimon.predicate;
+package org.apache.paimon.utils;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-/** A {@link PredicateVisitor} to replace {@link Predicate}. */
+/** Represents a filter (boolean-valued function) of three argument. */
@FunctionalInterface
-public interface PredicateReplaceVisitor extends
PredicateVisitor<Optional<Predicate>> {
+public interface TriFilter<T, U, R> {
+
+ TriFilter<?, ?, ?> ALWAYS_TRUE = (t, u, r) -> true;
+
+ boolean test(T t, U u, R r);
- @Override
- default Optional<Predicate> visit(CompoundPredicate predicate) {
- List<Predicate> converted = new ArrayList<>();
- for (Predicate child : predicate.children()) {
- Optional<Predicate> optional = child.visit(this);
- if (optional.isPresent()) {
- converted.add(optional.get());
- } else {
- return Optional.empty();
- }
- }
- return Optional.of(new CompoundPredicate(predicate.function(),
converted));
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ static <T, U, R> TriFilter<T, U, R> alwaysTrue() {
+ return (TriFilter) ALWAYS_TRUE;
}
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java
b/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java
new file mode 100644
index 0000000000..7486d00372
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PartitionValuePredicateVisitor}. */
+public class PartitionValuePredicateVisitorTest {
+
+ // Table schema: (pt INT, a INT, b INT), partition key: pt
+ private static final RowType TABLE_TYPE =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "pt", DataTypes.INT()),
+ DataTypes.FIELD(1, "a", DataTypes.INT()),
+ DataTypes.FIELD(2, "b", DataTypes.INT()));
+
+ private static final RowType PARTITION_TYPE =
+ DataTypes.ROW(DataTypes.FIELD(0, "pt", DataTypes.INT()));
+
+ private static final PredicateBuilder BUILDER = new
PredicateBuilder(TABLE_TYPE);
+
+ // ========================== Leaf: partition field
==========================
+
+ @Test
+ public void testPartitionEqualMatch() {
+ // pt = 1, partition value is pt=1 => AlwaysTrue
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Optional<Predicate> result = BUILDER.equal(0, 1).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testPartitionEqualNoMatch() {
+ // pt = 2, partition value is pt=1 => AlwaysFalse
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Optional<Predicate> result = BUILDER.equal(0, 2).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysFalse(result.get());
+ }
+
+ @Test
+ public void testPartitionGreaterThanMatch() {
+ // pt > 0, partition value is pt=1 => AlwaysTrue
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Optional<Predicate> result = BUILDER.greaterThan(0, 0).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testPartitionGreaterThanNoMatch() {
+ // pt > 5, partition value is pt=1 => AlwaysFalse
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Optional<Predicate> result = BUILDER.greaterThan(0, 5).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysFalse(result.get());
+ }
+
+ @Test
+ public void testPartitionLessOrEqualMatch() {
+ // pt <= 1, partition value is pt=1 => AlwaysTrue
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Optional<Predicate> result = BUILDER.lessOrEqual(0, 1).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testPartitionBetweenMatch() {
+ // pt BETWEEN 0 AND 5, partition value is pt=3 => AlwaysTrue
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(3));
+
+ Optional<Predicate> result = BUILDER.between(0, 0, 5).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testPartitionBetweenNoMatch() {
+ // pt BETWEEN 5 AND 10, partition value is pt=3 => AlwaysFalse
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(3));
+
+ Optional<Predicate> result = BUILDER.between(0, 5, 10).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysFalse(result.get());
+ }
+
+ @Test
+ public void testPartitionIsNullNoMatch() {
+ // pt IS NULL, partition value is pt=1 => AlwaysFalse
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Optional<Predicate> result = BUILDER.isNull(0).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysFalse(result.get());
+ }
+
+ @Test
+ public void testPartitionIsNotNullMatch() {
+ // pt IS NOT NULL, partition value is pt=1 => AlwaysTrue
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Optional<Predicate> result = BUILDER.isNotNull(0).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ // ========================== Leaf: non-partition field
==========================
+
+ @Test
+ public void testNonPartitionFieldKeptAsIs() {
+ // a = 5 is not a partition predicate => kept unchanged
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Predicate original = BUILDER.equal(1, 5);
+ Optional<Predicate> result = original.visit(visitor);
+ assertThat(result).isPresent();
+ assertThat(result.get()).isEqualTo(original);
+ }
+
+ // ========================== Compound: AND ==========================
+
+ @Test
+ public void testAndPartitionMatchAndNonPartition() {
+ // (pt = 1 AND a = 5), partition value is pt=1
+ // => AlwaysTrue simplified away by PredicateBuilder.and(), leaving
just a=5
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Predicate and = PredicateBuilder.and(BUILDER.equal(0, 1),
BUILDER.equal(1, 5));
+ Optional<Predicate> result = and.visit(visitor);
+ assertThat(result).isPresent();
+ assertThat(result.get()).isEqualTo(BUILDER.equal(1, 5));
+ }
+
+ @Test
+ public void testAndPartitionNoMatchAndNonPartition() {
+ // (pt = 2 AND a = 5), partition value is pt=1
+ // => AlwaysFalse short-circuits the entire AND
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Predicate and = PredicateBuilder.and(BUILDER.equal(0, 2),
BUILDER.equal(1, 5));
+ Optional<Predicate> result = and.visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysFalse(result.get());
+ }
+
+ @Test
+ public void testAndAllPartitionMatch() {
+ // (pt = 1 AND pt > 0), partition value is pt=1
+ // => both AlwaysTrue, simplified to AlwaysTrue
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Predicate and = PredicateBuilder.and(BUILDER.equal(0, 1),
BUILDER.greaterThan(0, 0));
+ Optional<Predicate> result = and.visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ // ========================== Compound: OR ==========================
+
+ @Test
+ public void testOrPartitionPredicates() {
+ // (pt = 1 OR pt = 2), partition value is pt=1
+ // => AlwaysTrue short-circuits the entire OR
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Predicate or = PredicateBuilder.or(BUILDER.equal(0, 1),
BUILDER.equal(0, 2));
+ Optional<Predicate> result = or.visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testOrPartitionMatchAndNonPartition() {
+ // (pt = 1 OR a = 5), partition value is pt=1
+ // => AlwaysTrue short-circuits the entire OR
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Predicate or = PredicateBuilder.or(BUILDER.equal(0, 1),
BUILDER.equal(1, 5));
+ Optional<Predicate> result = or.visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testOrPartitionNoMatchAndNonPartition() {
+ // (pt = 2 OR a = 5), partition value is pt=1
+ // => AlwaysFalse filtered out, leaving just a=5
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Predicate or = PredicateBuilder.or(BUILDER.equal(0, 2),
BUILDER.equal(1, 5));
+ Optional<Predicate> result = or.visit(visitor);
+ assertThat(result).isPresent();
+ assertThat(result.get()).isEqualTo(BUILDER.equal(1, 5));
+ }
+
+ // ========================== Multiple partition keys
==========================
+
+ @Test
+ public void testMultiplePartitionFieldsBothMatch() {
+ // Table: (pt1 INT, pt2 STRING, a INT), partition keys: (pt1, pt2)
+ RowType tableType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "pt1", DataTypes.INT()),
+ DataTypes.FIELD(1, "pt2", DataTypes.STRING()),
+ DataTypes.FIELD(2, "a", DataTypes.INT()));
+ RowType partitionType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "pt1", DataTypes.INT()),
+ DataTypes.FIELD(1, "pt2", DataTypes.STRING()));
+ PredicateBuilder builder = new PredicateBuilder(tableType);
+
+ // pt1 = 1 AND pt2 = 'x', partition value is (1, 'x') => both match =>
AlwaysTrue
+ GenericRow partitionRow = GenericRow.of(1,
BinaryString.fromString("x"));
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(tableType, partitionType,
partitionRow);
+
+ Predicate and =
+ PredicateBuilder.and(
+ builder.equal(0, 1), builder.equal(1,
BinaryString.fromString("x")));
+ Optional<Predicate> result = and.visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+ }
+
+ @Test
+ public void testMultiplePartitionFieldsPartialMatch() {
+ // pt1 = 1 AND pt2 = 'y', partition value is (1, 'x') => pt1 matches,
pt2 doesn't
+ RowType tableType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "pt1", DataTypes.INT()),
+ DataTypes.FIELD(1, "pt2", DataTypes.STRING()),
+ DataTypes.FIELD(2, "a", DataTypes.INT()));
+ RowType partitionType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "pt1", DataTypes.INT()),
+ DataTypes.FIELD(1, "pt2", DataTypes.STRING()));
+ PredicateBuilder builder = new PredicateBuilder(tableType);
+
+ GenericRow partitionRow = GenericRow.of(1,
BinaryString.fromString("x"));
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(tableType, partitionType,
partitionRow);
+
+ Predicate and =
+ PredicateBuilder.and(
+ builder.equal(0, 1), builder.equal(1,
BinaryString.fromString("y")));
+ Optional<Predicate> result = and.visit(visitor);
+ assertThat(result).isPresent();
+ // pt2 doesn't match => AlwaysFalse short-circuits the entire AND
+ assertAlwaysFalse(result.get());
+ }
+
+ // ========================== Partition field not at index 0
==========================
+
+ @Test
+ public void testPartitionFieldNotFirstInTable() {
+ // Table: (a INT, pt INT, b INT), partition key: pt (index 1 in table,
index 0 in partition)
+ RowType tableType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "a", DataTypes.INT()),
+ DataTypes.FIELD(1, "pt", DataTypes.INT()),
+ DataTypes.FIELD(2, "b", DataTypes.INT()));
+ RowType partitionType = DataTypes.ROW(DataTypes.FIELD(0, "pt",
DataTypes.INT()));
+ PredicateBuilder builder = new PredicateBuilder(tableType);
+
+ // pt = 3, partition value is pt=3
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(tableType, partitionType,
GenericRow.of(3));
+
+ Optional<Predicate> result = builder.equal(1, 3).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysTrue(result.get());
+
+ // pt = 5, partition value is pt=3 => AlwaysFalse
+ result = builder.equal(1, 5).visit(visitor);
+ assertThat(result).isPresent();
+ assertAlwaysFalse(result.get());
+
+ // a = 10 => kept as-is (non-partition field)
+ Predicate original = builder.equal(0, 10);
+ result = original.visit(visitor);
+ assertThat(result).isPresent();
+ assertThat(result.get()).isEqualTo(original);
+ }
+
+ // ========================== Nested compound ==========================
+
+ @Test
+ public void testNestedAndOr() {
+ // ((pt = 1 OR pt = 2) AND a = 5), partition value is pt=1
+ // Inner OR: AlwaysTrue short-circuits to AlwaysTrue
+ // Outer AND: AlwaysTrue simplified away, leaving just a=5
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Predicate or = PredicateBuilder.or(BUILDER.equal(0, 1),
BUILDER.equal(0, 2));
+ Predicate and = PredicateBuilder.and(or, BUILDER.equal(1, 5));
+ Optional<Predicate> result = and.visit(visitor);
+ assertThat(result).isPresent();
+ assertThat(result.get()).isEqualTo(BUILDER.equal(1, 5));
+ }
+
+ // ========================== Only non-partition predicates
==========================
+
+ @Test
+ public void testAllNonPartitionPredicatesUnchanged() {
+ // (a = 5 AND b = 10), partition value is pt=1 => kept unchanged
+ PartitionValuePredicateVisitor visitor =
+ new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE,
GenericRow.of(1));
+
+ Predicate original = PredicateBuilder.and(BUILDER.equal(1, 5),
BUILDER.equal(2, 10));
+ Optional<Predicate> result = original.visit(visitor);
+ assertThat(result).isPresent();
+ assertThat(result.get()).isEqualTo(original);
+ }
+
+ // ========================== Helpers ==========================
+
+ private static void assertAlwaysTrue(Predicate predicate) {
+ assertThat(predicate).isInstanceOf(LeafPredicate.class);
+ assertThat(((LeafPredicate)
predicate).function()).isEqualTo(AlwaysTrue.INSTANCE);
+ }
+
+ private static void assertAlwaysFalse(Predicate predicate) {
+ assertThat(predicate).isInstanceOf(LeafPredicate.class);
+ assertThat(((LeafPredicate)
predicate).function()).isEqualTo(AlwaysFalse.INSTANCE);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index ad71a1d595..f0f5e1c6ff 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -30,7 +30,6 @@ import
org.apache.paimon.operation.BucketedAppendFileStoreWrite;
import org.apache.paimon.operation.DataEvolutionFileStoreScan;
import org.apache.paimon.operation.DataEvolutionSplitRead;
import org.apache.paimon.operation.RawFileSplitRead;
-import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
@@ -41,12 +40,6 @@ import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
import java.util.Comparator;
-import java.util.List;
-import java.util.Optional;
-
-import static org.apache.paimon.predicate.PredicateBuilder.and;
-import static
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
-import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
/** {@link FileStore} for reading and writing {@link InternalRow}. */
public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {
@@ -143,26 +136,12 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
@Override
public AppendOnlyFileStoreScan newScan() {
BucketSelectConverter bucketSelectConverter =
- predicate -> {
- if (bucketMode() != BucketMode.HASH_FIXED) {
- return Optional.empty();
- }
-
- if (bucketKeyType.getFieldCount() == 0) {
- return Optional.empty();
- }
-
- List<Predicate> bucketFilters =
- pickTransformFieldMapping(
- splitAnd(predicate),
- rowType.getFieldNames(),
- bucketKeyType.getFieldNames());
- if (!bucketFilters.isEmpty()) {
- return BucketSelectConverter.create(
- and(bucketFilters), bucketKeyType,
options.bucketFunctionType());
- }
- return Optional.empty();
- };
+ new BucketSelectConverter(
+ bucketMode(),
+ options.bucketFunctionType(),
+ rowType,
+ partitionType,
+ bucketKeyType);
if (options().dataEvolutionEnabled()) {
return new DataEvolutionFileStoreScan(
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 710e2dc3a5..a1ae650b43 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -33,7 +33,6 @@ import org.apache.paimon.operation.KeyValueFileStoreWrite;
import org.apache.paimon.operation.MergeFileSplitRead;
import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
-import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -48,18 +47,12 @@ import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
-import java.util.Optional;
import java.util.function.Supplier;
-import static org.apache.paimon.predicate.PredicateBuilder.and;
-import static
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
-import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-
/** {@link FileStore} for querying and updating {@link KeyValue}s. */
public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
private final boolean crossPartitionUpdate;
- private final RowType bucketKeyType;
private final RowType keyType;
private final RowType valueType;
private final KeyValueFieldsExtractor keyValueFieldsExtractor;
@@ -74,7 +67,6 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
boolean crossPartitionUpdate,
CoreOptions options,
RowType partitionType,
- RowType bucketKeyType,
RowType keyType,
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
@@ -83,7 +75,6 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
CatalogEnvironment catalogEnvironment) {
super(fileIO, schemaManager, schema, tableName, options,
partitionType, catalogEnvironment);
this.crossPartitionUpdate = crossPartitionUpdate;
- this.bucketKeyType = bucketKeyType;
this.keyType = keyType;
this.valueType = valueType;
this.keyValueFieldsExtractor = keyValueFieldsExtractor;
@@ -203,25 +194,13 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
@Override
public KeyValueFileStoreScan newScan() {
- BucketMode bucketMode = bucketMode();
BucketSelectConverter bucketSelectConverter =
- keyFilter -> {
- if (bucketMode != BucketMode.HASH_FIXED
- && bucketMode != BucketMode.POSTPONE_MODE) {
- return Optional.empty();
- }
-
- List<Predicate> bucketFilters =
- pickTransformFieldMapping(
- splitAnd(keyFilter),
- keyType.getFieldNames(),
- bucketKeyType.getFieldNames());
- if (!bucketFilters.isEmpty()) {
- return BucketSelectConverter.create(
- and(bucketFilters), bucketKeyType,
options.bucketFunctionType());
- }
- return Optional.empty();
- };
+ new BucketSelectConverter(
+ bucketMode(),
+ options.bucketFunctionType(),
+ schema.logicalRowType(),
+ partitionType,
+ schema.logicalBucketKeyType());
return new KeyValueFileStoreScan(
newManifestsReader(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java
index 45cd074a5a..4662d7dab5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java
@@ -18,8 +18,9 @@
package org.apache.paimon.manifest;
-import org.apache.paimon.utils.BiFilter;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.TriFilter;
import javax.annotation.Nullable;
@@ -29,13 +30,13 @@ public class BucketFilter {
private final boolean onlyReadRealBuckets;
private final @Nullable Integer specifiedBucket;
private final @Nullable Filter<Integer> bucketFilter;
- private final @Nullable BiFilter<Integer, Integer> totalAwareBucketFilter;
+ private final @Nullable TriFilter<BinaryRow, Integer, Integer>
totalAwareBucketFilter;
public BucketFilter(
boolean onlyReadRealBuckets,
@Nullable Integer specifiedBucket,
@Nullable Filter<Integer> bucketFilter,
- @Nullable BiFilter<Integer, Integer> totalAwareBucketFilter) {
+ @Nullable TriFilter<BinaryRow, Integer, Integer>
totalAwareBucketFilter) {
this.onlyReadRealBuckets = onlyReadRealBuckets;
this.specifiedBucket = specifiedBucket;
this.bucketFilter = bucketFilter;
@@ -46,7 +47,7 @@ public class BucketFilter {
boolean onlyReadRealBuckets,
@Nullable Integer specifiedBucket,
@Nullable Filter<Integer> bucketFilter,
- @Nullable BiFilter<Integer, Integer> totalAwareBucketFilter) {
+ @Nullable TriFilter<BinaryRow, Integer, Integer>
totalAwareBucketFilter) {
if (!onlyReadRealBuckets
&& specifiedBucket == null
&& bucketFilter == null
@@ -63,7 +64,7 @@ public class BucketFilter {
return specifiedBucket;
}
- public boolean test(int bucket, int totalBucket) {
+ public boolean test(BinaryRow partition, int bucket, int totalBucket) {
if (onlyReadRealBuckets && bucket < 0) {
return false;
}
@@ -73,6 +74,7 @@ public class BucketFilter {
if (bucketFilter != null && !bucketFilter.test(bucket)) {
return false;
}
- return totalAwareBucketFilter == null ||
totalAwareBucketFilter.test(bucket, totalBucket);
+ return totalAwareBucketFilter == null
+ || totalAwareBucketFilter.test(partition, bucket, totalBucket);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
index 59b2a34650..09afdcc3ac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
@@ -125,10 +125,10 @@ public class ManifestEntryCache extends
ObjectsCache<Path, ManifestEntry, Manife
List<RichSegments> segments = manifestSegments.segments();
// try to do fast filter first
- Optional<BinaryRow> partition =
extractSinglePartition(partitionFilter);
- if (partition.isPresent()) {
+ Optional<BinaryRow> singlePartition =
extractSinglePartition(partitionFilter);
+ if (singlePartition.isPresent()) {
Map<Integer, List<RichSegments>> segMap =
- manifestSegments.indexedSegments().get(partition.get());
+
manifestSegments.indexedSegments().get(singlePartition.get());
if (segMap == null) {
return Collections.emptyList();
}
@@ -147,11 +147,13 @@ public class ManifestEntryCache extends
ObjectsCache<Path, ManifestEntry, Manife
// do force loop filter
List<Segments> segmentsList = new ArrayList<>();
for (RichSegments richSegments : segments) {
- if (partitionFilter != null &&
!partitionFilter.test(richSegments.partition())) {
+ BinaryRow partition = richSegments.partition();
+ if (partitionFilter != null && !partitionFilter.test(partition)) {
continue;
}
if (bucketFilter != null
- && !bucketFilter.test(richSegments.bucket(),
richSegments.totalBucket())) {
+ && !bucketFilter.test(
+ partition, richSegments.bucket(),
richSegments.totalBucket())) {
continue;
}
segmentsList.add(richSegments.segments());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 68ebacaa80..30908cce3e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -46,6 +46,7 @@ import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RowRangeIndex;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TriFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,7 +88,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private boolean onlyReadRealBuckets = false;
private Integer specifiedBucket = null;
private Filter<Integer> bucketFilter = null;
- private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
+ private TriFilter<BinaryRow, Integer, Integer> totalAwareBucketFilter =
null;
protected ScanMode scanMode = ScanMode.ALL;
private Integer specifiedLevel = null;
private Filter<Integer> levelFilter = null;
@@ -162,7 +163,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public FileStoreScan withTotalAwareBucketFilter(
- BiFilter<Integer, Integer> totalAwareBucketFilter) {
+ TriFilter<BinaryRow, Integer, Integer> totalAwareBucketFilter) {
this.totalAwareBucketFilter = totalAwareBucketFilter;
return this;
}
@@ -533,14 +534,21 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
Function<InternalRow, Integer> levelGetter =
ManifestEntrySerializer.levelGetter();
BucketFilter bucketFilter = createBucketFilter();
return row -> {
- if ((partitionFilter != null &&
!partitionFilter.test(partitionGetter.apply(row)))) {
- return false;
+ BinaryRow partition = null;
+ if (partitionFilter != null) {
+ partition = partitionGetter.apply(row);
+ if (!partitionFilter.test(partition)) {
+ return false;
+ }
}
if (bucketFilter != null) {
int bucket = bucketGetter.apply(row);
int totalBucket = totalBucketGetter.apply(row);
- if (!bucketFilter.test(bucket, totalBucket)) {
+ if (partition == null) {
+ partition = partitionGetter.apply(row);
+ }
+ if (!bucketFilter.test(partition, bucket, totalBucket)) {
return false;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 3546a1fcae..2714e773a2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -86,6 +86,11 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
this.inputFilter = predicate;
+ return this;
+ }
+
+ @Override
+ public FileStoreScan withCompleteFilter(Predicate predicate) {
this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
index 6577099aa8..e441641f7e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
@@ -19,160 +19,55 @@
package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions.BucketFunctionType;
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.bucket.BucketFunction;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.predicate.Equal;
-import org.apache.paimon.predicate.FieldRef;
-import org.apache.paimon.predicate.In;
-import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.BiFilter;
+import org.apache.paimon.utils.TriFilter;
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-import static org.apache.paimon.predicate.PredicateBuilder.splitOr;
+import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
/** Bucket filter push down in scan to skip files. */
-public interface BucketSelectConverter {
-
- int MAX_VALUES = 1000;
-
- Optional<BiFilter<Integer, Integer>> convert(Predicate predicate);
-
- static Optional<BiFilter<Integer, Integer>> create(
- Predicate bucketPredicate,
- RowType bucketKeyType,
- BucketFunctionType bucketFunctionType) {
- @SuppressWarnings("unchecked")
- List<Object>[] bucketValues = new List[bucketKeyType.getFieldCount()];
-
- BucketFunction bucketFunction =
BucketFunction.create(bucketFunctionType, bucketKeyType);
-
- nextAnd:
- for (Predicate andPredicate : splitAnd(bucketPredicate)) {
- Integer reference = null;
- List<Object> values = new ArrayList<>();
- for (Predicate orPredicate : splitOr(andPredicate)) {
- if (orPredicate instanceof LeafPredicate) {
- LeafPredicate leaf = (LeafPredicate) orPredicate;
- Optional<FieldRef> fieldRefOptional =
leaf.fieldRefOptional();
- if (fieldRefOptional.isPresent()) {
- FieldRef fieldRef = fieldRefOptional.get();
- if (reference == null || reference ==
fieldRef.index()) {
- reference = fieldRef.index();
- if (leaf.function().equals(Equal.INSTANCE)
- || leaf.function().equals(In.INSTANCE)) {
- values.addAll(
- leaf.literals().stream()
- .filter(Objects::nonNull)
- .collect(Collectors.toList()));
- continue;
- }
- }
- }
- }
-
- // failed, go to next predicate
- continue nextAnd;
- }
- if (reference != null) {
- if (bucketValues[reference] != null) {
- // Repeated equals in And?
- return Optional.empty();
- }
-
- bucketValues[reference] = values;
- }
- }
-
- int rowCount = 1;
- for (List<Object> values : bucketValues) {
- if (values == null) {
- return Optional.empty();
- }
-
- rowCount *= values.size();
- if (rowCount > MAX_VALUES) {
- return Optional.empty();
- }
- }
-
- InternalRowSerializer serializer = new
InternalRowSerializer(bucketKeyType);
- List<BinaryRow> bucketKeys = new ArrayList<>();
- assembleRows(
- bucketValues,
- columns ->
- bucketKeys.add(
-
serializer.toBinaryRow(GenericRow.of(columns.toArray())).copy()),
- new ArrayList<>(),
- 0);
-
- return Optional.of(new Selector(bucketKeys, bucketFunction));
+public class BucketSelectConverter {
+
+ private final BucketMode bucketMode;
+ private final BucketFunctionType bucketFunctionType;
+ private final RowType rowType;
+ private final RowType partitionType;
+ private final RowType bucketKeyType;
+
+ public BucketSelectConverter(
+ BucketMode bucketMode,
+ BucketFunctionType bucketFunctionType,
+ RowType rowType,
+ RowType partitionType,
+ RowType bucketKeyType) {
+ this.bucketMode = bucketMode;
+ this.bucketFunctionType = bucketFunctionType;
+ this.rowType = rowType;
+ this.partitionType = partitionType;
+ this.bucketKeyType = bucketKeyType;
}
- static void assembleRows(
- List<Object>[] rowValues,
- Consumer<List<Object>> consumer,
- List<Object> stack,
- int columnIndex) {
- List<Object> columnValues = rowValues[columnIndex];
- for (Object value : columnValues) {
- stack.add(value);
- if (columnIndex == rowValues.length - 1) {
- // last column, consume row
- consumer.accept(stack);
- } else {
- assembleRows(rowValues, consumer, stack, columnIndex + 1);
- }
- stack.remove(stack.size() - 1);
+ public Optional<TriFilter<BinaryRow, Integer, Integer>> convert(Predicate
predicate) {
+ if (bucketMode != BucketMode.HASH_FIXED && bucketMode !=
BucketMode.POSTPONE_MODE) {
+ return Optional.empty();
}
- }
- /** Selector to select bucket from {@link Predicate}. */
- @ThreadSafe
- class Selector implements BiFilter<Integer, Integer> {
-
- private final List<BinaryRow> bucketKeys;
-
- private final BucketFunction bucketFunction;
-
- private final Map<Integer, Set<Integer>> buckets = new
ConcurrentHashMap<>();
-
- public Selector(List<BinaryRow> bucketKeys, BucketFunction
bucketFunction) {
- this.bucketKeys = bucketKeys;
- this.bucketFunction = bucketFunction;
+ if (bucketKeyType.getFieldCount() == 0) {
+ return Optional.empty();
}
- @Override
- public boolean test(Integer bucket, Integer numBucket) {
- return buckets.computeIfAbsent(numBucket, k ->
createBucketSet(numBucket))
- .contains(bucket);
+ Set<String> predicateFields = collectFieldNames(predicate);
+ if (!predicateFields.containsAll(bucketKeyType.getFieldNames())) {
+ return Optional.empty();
}
- @VisibleForTesting
- Set<Integer> createBucketSet(int numBucket) {
- ImmutableSet.Builder<Integer> builder = new
ImmutableSet.Builder<>();
- for (BinaryRow key : bucketKeys) {
- builder.add(bucketFunction.bucket(key, numBucket));
- }
- return builder.build();
- }
+ return Optional.of(
+ new BucketSelector(
+ predicate, bucketFunctionType, rowType, partitionType,
bucketKeyType));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java
similarity index 63%
copy from
paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
copy to
paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java
index 6577099aa8..8c28c3c0c6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java
@@ -19,7 +19,6 @@
package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions.BucketFunctionType;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.bucket.BucketFunction;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
@@ -28,15 +27,18 @@ import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.In;
import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.PartitionValuePredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BiFilter;
+import org.apache.paimon.utils.TriFilter;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
import javax.annotation.concurrent.ThreadSafe;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -46,20 +48,67 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static org.apache.paimon.predicate.PredicateBuilder.and;
+import static
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
import static org.apache.paimon.predicate.PredicateBuilder.splitOr;
-/** Bucket filter push down in scan to skip files. */
-public interface BucketSelectConverter {
+/** Selector to select bucket from {@link Predicate}. */
+@ThreadSafe
+public class BucketSelector implements TriFilter<BinaryRow, Integer, Integer> {
+
+ public static final int MAX_VALUES = 1000;
+
+ private final BucketFunctionType bucketFunctionType;
+ private final RowType rowType;
+ private final RowType partitionType;
+ private final RowType bucketKeyType;
+ private final Predicate predicate;
+ private final Map<BinaryRow, Optional<PartitionSelector>>
partitionSelectors;
+
+ public BucketSelector(
+ Predicate predicate,
+ BucketFunctionType bucketFunctionType,
+ RowType rowType,
+ RowType partitionType,
+ RowType bucketKeyType) {
+ this.predicate = predicate;
+ this.bucketFunctionType = bucketFunctionType;
+ this.rowType = rowType;
+ this.partitionType = partitionType;
+ this.bucketKeyType = bucketKeyType;
+ this.partitionSelectors = new ConcurrentHashMap<>();
+ }
- int MAX_VALUES = 1000;
+ @Override
+ public boolean test(BinaryRow partition, Integer bucket, Integer
numBucket) {
+ return partitionSelectors
+ .computeIfAbsent(partition, this::createPartitionSelector)
+ .map(selector -> selector.test(bucket, numBucket))
+ .orElse(true);
+ }
- Optional<BiFilter<Integer, Integer>> convert(Predicate predicate);
+ private Optional<PartitionSelector> createPartitionSelector(BinaryRow
partition) {
+ Optional<Predicate> partRemoved =
+ predicate.visit(
+ new PartitionValuePredicateVisitor(rowType,
partitionType, partition));
+ if (!partRemoved.isPresent()) {
+ return Optional.empty();
+ }
+
+ List<Predicate> bucketFilters =
+ pickTransformFieldMapping(
+ splitAnd(partRemoved.get()),
+ rowType.getFieldNames(),
+ bucketKeyType.getFieldNames());
+ if (bucketFilters.isEmpty()) {
+ return Optional.empty();
+ }
- static Optional<BiFilter<Integer, Integer>> create(
- Predicate bucketPredicate,
- RowType bucketKeyType,
- BucketFunctionType bucketFunctionType) {
+ return createPartitionSelector(and(bucketFilters));
+ }
+
+ private Optional<PartitionSelector> createPartitionSelector(Predicate
bucketPredicate) {
@SuppressWarnings("unchecked")
List<Object>[] bucketValues = new List[bucketKeyType.getFieldCount()];
@@ -94,11 +143,16 @@ public interface BucketSelectConverter {
}
if (reference != null) {
if (bucketValues[reference] != null) {
- // Repeated equals in And?
- return Optional.empty();
+ // Same field appears in multiple AND branches,
+ // compute intersection to narrow down possible values
+ bucketValues[reference].retainAll(new HashSet<>(values));
+ if (bucketValues[reference].isEmpty()) {
+ // Empty intersection: contradictory conditions, no
match
+ return Optional.empty();
+ }
+ } else {
+ bucketValues[reference] = values;
}
-
- bucketValues[reference] = values;
}
}
@@ -124,10 +178,10 @@ public interface BucketSelectConverter {
new ArrayList<>(),
0);
- return Optional.of(new Selector(bucketKeys, bucketFunction));
+ return Optional.of(new PartitionSelector(bucketKeys, bucketFunction));
}
- static void assembleRows(
+ private static void assembleRows(
List<Object>[] rowValues,
Consumer<List<Object>> consumer,
List<Object> stack,
@@ -145,9 +199,8 @@ public interface BucketSelectConverter {
}
}
- /** Selector to select bucket from {@link Predicate}. */
@ThreadSafe
- class Selector implements BiFilter<Integer, Integer> {
+ private static class PartitionSelector implements BiFilter<Integer,
Integer> {
private final List<BinaryRow> bucketKeys;
@@ -155,7 +208,7 @@ public interface BucketSelectConverter {
private final Map<Integer, Set<Integer>> buckets = new
ConcurrentHashMap<>();
- public Selector(List<BinaryRow> bucketKeys, BucketFunction
bucketFunction) {
+ public PartitionSelector(List<BinaryRow> bucketKeys, BucketFunction
bucketFunction) {
this.bucketKeys = bucketKeys;
this.bucketFunction = bucketFunction;
}
@@ -166,8 +219,7 @@ public interface BucketSelectConverter {
.contains(bucket);
}
- @VisibleForTesting
- Set<Integer> createBucketSet(int numBucket) {
+ private Set<Integer> createBucketSet(int numBucket) {
ImmutableSet.Builder<Integer> builder = new
ImmutableSet.Builder<>();
for (BinaryRow key : bucketKeys) {
builder.add(bucketFunction.bucket(key, numBucket));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 1e044f810f..8f543ca7d3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -35,6 +35,7 @@ import org.apache.paimon.utils.BiFilter;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RowRangeIndex;
+import org.apache.paimon.utils.TriFilter;
import javax.annotation.Nullable;
@@ -56,13 +57,15 @@ public interface FileStoreScan {
FileStoreScan withPartitionFilter(PartitionPredicate predicate);
+ FileStoreScan withCompleteFilter(Predicate predicate);
+
FileStoreScan withBucket(int bucket);
FileStoreScan onlyReadRealBuckets();
FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);
- FileStoreScan withTotalAwareBucketFilter(BiFilter<Integer, Integer>
bucketFilter);
+ FileStoreScan withTotalAwareBucketFilter(TriFilter<BinaryRow, Integer,
Integer> bucketFilter);
FileStoreScan withPartitionBucket(BinaryRow partition, int bucket);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index a09fb25ee6..8595753cf0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -116,6 +116,11 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
this.keyFilter = predicate;
+ return this;
+ }
+
+ @Override
+ public FileStoreScan withCompleteFilter(Predicate predicate) {
this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter);
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 5e4cffcbf6..a2fee49bfb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -93,8 +93,6 @@ public class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
tableSchema.crossPartitionUpdate(),
options,
tableSchema.logicalPartitionType(),
- PrimaryKeyTableUtils.addKeyNamePrefix(
- tableSchema.logicalBucketKeyType()),
keyType,
rowType,
extractor,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index a6710ff008..67d0b9a566 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -241,6 +241,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
if (!pair.getRight().isEmpty()) {
nonPartitionFilterConsumer.accept(scan,
PredicateBuilder.and(pair.getRight()));
}
+ scan.withCompleteFilter(predicate);
return this;
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 8142d44acf..09595d9188 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -134,7 +134,6 @@ public class TestFileStore extends KeyValueFileStore {
options,
partitionType,
keyType,
- keyType,
valueType,
keyValueFieldsExtractor,
mfFactory,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java
deleted file mode 100644
index eeec69d202..0000000000
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.manifest;
-
-import org.apache.paimon.utils.BiFilter;
-import org.apache.paimon.utils.Filter;
-
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link BucketFilter}. */
-public class BucketFilterTest {
-
- @Test
- public void testCreateWithAllNullParameters() {
- // Test that create method returns null when all parameters are
null/false
- assertThat(BucketFilter.create(false, null, null, null)).isNull();
- }
-
- @Test
- public void testCreateWithOnlyReadRealBuckets() {
- // Test that create method returns a BucketFilter when
onlyReadRealBuckets is true
- BucketFilter filter = BucketFilter.create(true, null, null, null);
- assertThat(filter).isNotNull();
- assertThat(filter.specifiedBucket()).isNull();
- }
-
- @Test
- public void testCreateWithSpecifiedBucket() {
- // Test that create method returns a BucketFilter when specifiedBucket
is not null
- BucketFilter filter = BucketFilter.create(false, 1, null, null);
- assertThat(filter).isNotNull();
- assertThat(filter.specifiedBucket()).isEqualTo(1);
- }
-
- @Test
- public void testCreateWithBucketFilter() {
- // Test that create method returns a BucketFilter when bucketFilter is
not null
- Filter<Integer> bucketFilter = value -> value > 0;
- BucketFilter filter = BucketFilter.create(false, null, bucketFilter,
null);
- assertThat(filter).isNotNull();
- assertThat(filter.specifiedBucket()).isNull();
- }
-
- @Test
- public void testCreateWithTotalAwareBucketFilter() {
- // Test that create method returns a BucketFilter when
totalAwareBucketFilter is not null
- BiFilter<Integer, Integer> totalAwareBucketFilter =
- (bucket, totalBucket) -> bucket < totalBucket;
- BucketFilter filter = BucketFilter.create(false, null, null,
totalAwareBucketFilter);
- assertThat(filter).isNotNull();
- assertThat(filter.specifiedBucket()).isNull();
- }
-
- @Test
- public void testTestWithOnlyReadRealBuckets() {
- // Test the test method with onlyReadRealBuckets parameter
- BucketFilter filter = BucketFilter.create(true, null, null, null);
-
- // Real buckets (non-negative) should pass
- assertThat(filter.test(0, 1)).isTrue();
- assertThat(filter.test(1, 2)).isTrue();
-
- // Virtual buckets (negative) should not pass
- assertThat(filter.test(-1, 1)).isFalse();
- assertThat(filter.test(-2, 2)).isFalse();
- }
-
- @Test
- public void testTestWithSpecifiedBucket() {
- // Test the test method with specifiedBucket parameter
- BucketFilter filter = BucketFilter.create(false, 1, null, null);
-
- // Only the specified bucket should pass
- assertThat(filter.test(1, 2)).isTrue();
-
- // Other buckets should not pass
- assertThat(filter.test(0, 2)).isFalse();
- assertThat(filter.test(2, 3)).isFalse();
- }
-
- @Test
- public void testTestWithBucketFilter() {
- // Test the test method with bucketFilter parameter
- Filter<Integer> bucketFilter = value -> value % 2 == 0; // Even
buckets only
- BucketFilter filter = BucketFilter.create(false, null, bucketFilter,
null);
-
- // Even buckets should pass
- assertThat(filter.test(0, 1)).isTrue();
- assertThat(filter.test(2, 3)).isTrue();
- assertThat(filter.test(4, 5)).isTrue();
-
- // Odd buckets should not pass
- assertThat(filter.test(1, 2)).isFalse();
- assertThat(filter.test(3, 4)).isFalse();
- assertThat(filter.test(5, 6)).isFalse();
- }
-
- @Test
- public void testTestWithTotalAwareBucketFilter() {
- // Test the test method with totalAwareBucketFilter parameter
- BiFilter<Integer, Integer> totalAwareBucketFilter =
- (bucket, totalBucket) -> bucket < totalBucket / 2;
- BucketFilter filter = BucketFilter.create(false, null, null,
totalAwareBucketFilter);
-
- // Buckets less than half of totalBucket should pass
- assertThat(filter.test(0, 4)).isTrue();
- assertThat(filter.test(1, 4)).isTrue();
-
- // Buckets greater than or equal to half of totalBucket should not pass
- assertThat(filter.test(2, 4)).isFalse();
- assertThat(filter.test(3, 4)).isFalse();
- }
-
- @Test
- public void testTestWithMultipleFilters() {
- // Test the test method with multiple filters combined
- Filter<Integer> bucketFilter = value -> value > 0; // Positive buckets
only
- BiFilter<Integer, Integer> totalAwareBucketFilter =
- (bucket, totalBucket) -> bucket < totalBucket - 1;
- BucketFilter filter = BucketFilter.create(true, 1, bucketFilter,
totalAwareBucketFilter);
-
- // Bucket 1 is positive, is the specified bucket, and is less than
totalBucket-1 for
- // totalBucket=3
- assertThat(filter.test(1, 3)).isTrue();
-
- // Bucket 0 is not positive, so it should not pass
- assertThat(filter.test(0, 3)).isFalse();
-
- // Bucket 2 is not the specified bucket, so it should not pass
- assertThat(filter.test(2, 3)).isFalse();
-
- // Bucket 1 with totalBucket=2 should not pass because 1 >= 2-1
- assertThat(filter.test(1, 2)).isFalse();
-
- // Negative bucket should not pass because onlyReadRealBuckets is true
- assertThat(filter.test(-1, 3)).isFalse();
- }
-
- @Test
- public void testSpecifiedBucket() {
- // Test the specifiedBucket method
- BucketFilter filterWithSpecifiedBucket = BucketFilter.create(false, 2,
null, null);
- assertThat(filterWithSpecifiedBucket.specifiedBucket()).isEqualTo(2);
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java
deleted file mode 100644
index dc64bdb596..0000000000
---
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.CoreOptions.BucketFunctionType;
-import org.apache.paimon.operation.BucketSelectConverter.Selector;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.Optional;
-
-import static org.apache.paimon.predicate.PredicateBuilder.and;
-import static org.apache.paimon.predicate.PredicateBuilder.or;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link BucketSelectConverter}. */
-public class BucketSelectConverterTest {
-
- private final RowType rowType = RowType.of(new IntType(), new IntType(),
new IntType());
-
- private final PredicateBuilder builder = new PredicateBuilder(rowType);
-
- @Test
- public void testRepeatEqual() {
- assertThat(newSelector(and(builder.equal(0, 0), builder.equal(0,
1)))).isEmpty();
- }
-
- @Test
- public void testNotFull() {
- assertThat(newSelector(and(builder.equal(0, 0)))).isEmpty();
- }
-
- @Test
- public void testOtherPredicate() {
- assertThat(newSelector(and(builder.notEqual(0, 0)))).isEmpty();
- }
-
- @Test
- public void testOrIllegal() {
- assertThat(
- newSelector(
- and(
- or(builder.equal(0, 5),
builder.equal(1, 6)),
- builder.equal(1, 1),
- builder.equal(2, 2))))
- .isEmpty();
- }
-
- @Test
- public void testNormal() {
- Selector selector =
- newSelector(and(builder.equal(0, 0), builder.equal(1, 1),
builder.equal(2, 2)))
- .get();
- assertThat(selector.createBucketSet(20)).containsExactly(11);
- }
-
- @Test
- public void testIn() {
- Selector selector =
- newSelector(
- and(
- builder.in(0, Arrays.asList(5, 6, 7)),
- builder.equal(1, 1),
- builder.equal(2, 2)))
- .get();
- assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10);
- }
-
- @Test
- public void testOr() {
- Selector selector =
- newSelector(
- and(
- or(
- builder.equal(0, 5),
- builder.equal(0, 6),
- builder.equal(0, 7)),
- builder.equal(1, 1),
- builder.equal(2, 2)))
- .get();
- assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10);
- }
-
- @Test
- public void testInNull() {
- Selector selector =
- newSelector(
- and(
- builder.in(0, Arrays.asList(5, 6, 7,
null)),
- builder.equal(1, 1),
- builder.equal(2, 2)))
- .get();
- assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10);
- }
-
- @Test
- public void testMultipleIn() {
- Selector selector =
- newSelector(
- and(
- builder.in(0, Arrays.asList(5, 6, 7)),
- builder.in(1, Arrays.asList(1, 8)),
- builder.equal(2, 2)))
- .get();
- assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9,
10, 19);
- }
-
- @Test
- public void testMultipleOr() {
- Selector selector =
- newSelector(
- and(
- or(
- builder.equal(0, 5),
- builder.equal(0, 6),
- builder.equal(0, 7)),
- or(builder.equal(1, 1),
builder.equal(1, 8)),
- builder.equal(2, 2)))
- .get();
- assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9,
10, 19);
- }
-
- private Optional<Selector> newSelector(Predicate predicate) {
- return (Optional)
- BucketSelectConverter.create(predicate, rowType,
BucketFunctionType.DEFAULT);
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java
new file mode 100644
index 0000000000..3128ff20db
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions.BucketFunctionType;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BucketSelector}. */
+public class BucketSelectorTest {
+
+ private static final int NUM_BUCKETS = 10;
+
+ // ========================== Single bucket key, non-partitioned
==========================
+
+ @Test
+ public void testEqualPredicate() {
+ // k = 5 => should select exactly one bucket
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = pb.equal(0, 5);
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType,
partType, bucketKeyType);
+
+ Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW,
NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ @Test
+ public void testEqualAndRangePredicate() {
+ // k = 5 AND k < 100 => should still select bucket for k=5
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = PredicateBuilder.and(pb.equal(0, 5),
pb.lessThan(0, 100));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType,
partType, bucketKeyType);
+
+ Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW,
NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ @Test
+ public void testEqualAndInWithOverlap() {
+ // k = 5 AND k IN (5, 10) => intersection is {5}, should select one
bucket
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.in(0,
Arrays.asList(5, 10)));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType,
partType, bucketKeyType);
+
+ Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW,
NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ @Test
+ public void testInAndInWithOverlap() {
+ // k IN (1, 5) AND k IN (5, 10) => intersection is {5}, should select
one bucket
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate =
+ PredicateBuilder.and(pb.in(0, Arrays.asList(1, 5)), pb.in(0,
Arrays.asList(5, 10)));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType,
partType, bucketKeyType);
+
+ Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW,
NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ @Test
+ public void testRedundantEquals() {
+ // k = 5 AND k = 5 => redundant, should still select one bucket
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.equal(0,
5));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType,
partType, bucketKeyType);
+
+ Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW,
NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ @Test
+ public void testContradictoryEquals() {
+ // k = 5 AND k = 10 => empty intersection, no bucket can match
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.equal(0,
10));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType,
partType, bucketKeyType);
+
+ // Empty intersection => Optional.empty() => orElse(true) => all
buckets pass
+ // (conservative: no filtering when we can't determine)
+ Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW,
NUM_BUCKETS);
+ assertThat(selected).hasSize(NUM_BUCKETS);
+ }
+
+ @Test
+ public void testRangeOnlyFallsBackToFullScan() {
+ // k < 100 => no Equal/In to extract, full scan
+ RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ RowType partType = RowType.of();
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = pb.lessThan(0, 100);
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType,
partType, bucketKeyType);
+
+ Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW,
NUM_BUCKETS);
+ assertThat(selected).hasSize(NUM_BUCKETS);
+ }
+
+ // ========================== Multi-field bucket key
==========================
+
+ @Test
+ public void testMultiFieldBucketKey() {
+ // k1 = 5 AND k2 = 10 => one combination, one bucket
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "k1", DataTypes.INT()),
+ DataTypes.FIELD(1, "k2", DataTypes.INT()));
+ RowType partType = RowType.of();
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.equal(1,
10));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType,
partType, rowType);
+
+ Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW,
NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ @Test
+ public void testMultiFieldWithIntersection() {
+ // k1 IN (1, 5) AND k1 IN (5, 10) AND k2 = 3 => k1={5}, k2={3} => one
combination
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "k1", DataTypes.INT()),
+ DataTypes.FIELD(1, "k2", DataTypes.INT()));
+ RowType partType = RowType.of();
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate =
+ PredicateBuilder.and(
+ pb.in(0, Arrays.asList(1, 5)),
+ pb.in(0, Arrays.asList(5, 10)),
+ pb.equal(1, 3));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType,
partType, rowType);
+
+ Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW,
NUM_BUCKETS);
+ assertThat(selected).hasSize(1);
+ }
+
+ // ========================== Partitioned table ==========================
+
+ @Test
+ public void testPartitionedTableWithBucketFilter() {
+ // Table: (pt INT, k INT), partition: (pt), bucket key: (k)
+ // Predicate: pt = 1 AND k = 5
+ RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "pt", DataTypes.INT()),
+ DataTypes.FIELD(1, "k", DataTypes.INT()));
+ RowType partType = DataTypes.ROW(DataTypes.FIELD(0, "pt",
DataTypes.INT()));
+ RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k",
DataTypes.INT()));
+ PredicateBuilder pb = new PredicateBuilder(rowType);
+
+ Predicate predicate = PredicateBuilder.and(pb.equal(0, 1), pb.equal(1,
5));
+ BucketSelector selector =
+ new BucketSelector(
+ predicate, BucketFunctionType.DEFAULT, rowType,
partType, bucketKeyType);
+
+ // For partition pt=1 (matching), bucket filtering should work
+ BinaryRow partition1 =
+ new
org.apache.paimon.data.serializer.InternalRowSerializer(partType)
+ .toBinaryRow(org.apache.paimon.data.GenericRow.of(1))
+ .copy();
+ Set<Integer> selected1 = selectedBuckets(selector, partition1,
NUM_BUCKETS);
+ assertThat(selected1).hasSize(1);
+
+ // For partition pt=2 (not matching), predicate becomes AlwaysFalse
+ // => no bucket key values extracted => full scan (conservative)
+ BinaryRow partition2 =
+ new
org.apache.paimon.data.serializer.InternalRowSerializer(partType)
+ .toBinaryRow(org.apache.paimon.data.GenericRow.of(2))
+ .copy();
+ Set<Integer> selected2 = selectedBuckets(selector, partition2,
NUM_BUCKETS);
+ assertThat(selected2).hasSize(NUM_BUCKETS);
+ }
+
+ // ========================== Helpers ==========================
+
+ private static Set<Integer> selectedBuckets(
+ BucketSelector selector, BinaryRow partition, int numBuckets) {
+ Set<Integer> selected = new HashSet<>();
+ for (int b = 0; b < numBuckets; b++) {
+ if (selector.test(partition, b, numBuckets)) {
+ selected.add(b);
+ }
+ }
+ return selected;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java
new file mode 100644
index 0000000000..94494e0406
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests bucket filtering with compound predicates on a partitioned,
fixed-bucket, append-only
+ * table.
+ */
+public class BucketFilterScanTest extends TableTestBase {
+
+ @Test
+ public void testBucketFilterWithCompoundPredicateOnAppendTable() throws
Exception {
+ testBucketFilterWithCompoundPredicate(false);
+ }
+
+ @Test
+ public void testBucketFilterWithCompoundPredicateOnPkTable() throws
Exception {
+ testBucketFilterWithCompoundPredicate(true);
+ }
+
+ @Test
+ public void testCompositeBucketFilterWithCompoundPredicateOnAppendTable()
throws Exception {
+ testCompositeBucketFilterWithCompoundPredicate(false);
+ }
+
+ @Test
+ public void testCompositeBucketFilterWithCompoundPredicateOnPkTable()
throws Exception {
+ testCompositeBucketFilterWithCompoundPredicate(true);
+ }
+
+ /**
+ * Tests bucket filtering with compound predicates on a single-field
bucket key.
+ *
+ * <p>Table schema:
+ *
+ * <ul>
+ * <li>Partition key: column 'a' (INT)
+ * <li>Bucket key: column 'b' (INT)
+ * <li>Bucket count: 10
+ * </ul>
+ *
+ * <p>Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20)
= 100 rows.
+ *
+ * <p>Test scenarios:
+ *
+ * <ol>
+ * <li>Predicate: (a < 3 AND b = 5) OR (a = 3 AND b = 7) - Tests
partition range filter
+ * with bucket equality, combined with OR. Expected: buckets for
partition 1,2 with b=5
+ * and partition 3 with b=7.
+ * <li>Predicate: (a < 3 AND b = 5) OR (a = 3 AND b < 100) - Tests
partition range with
+ * bucket equality, OR partition equality with bucket range.
Expected: mixed buckets from
+ * partition 3 and specific buckets from partitions 1,2.
+ * <li>Predicate: (a = 2 AND b = 5) OR (a = 3 AND b = 7) - Tests
partition equality with
+ * bucket equality in both OR branches. Expected: exact bucket
matching for each
+ * partition-b combination.
+ * </ol>
+ */
+ private void testBucketFilterWithCompoundPredicate(boolean pk) throws
Exception {
+ // ---- schema & table ----
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .column("c", DataTypes.INT())
+ .partitionKeys("a")
+ .option(BUCKET.key(), "10");
+ if (pk) {
+ builder.primaryKey("a", "b");
+ } else {
+ builder.option(BUCKET_KEY.key(), "b");
+ }
+ Schema schema = builder.build();
+
+ Identifier tableId = identifier("test_bucket_filter");
+ catalog.createTable(tableId, schema, false);
+ Table table = catalog.getTable(tableId);
+
+ // ---- write data: 5 partitions × 20 b-values = 100 rows ----
+ GenericRow[] rows = new GenericRow[100];
+ int idx = 0;
+ for (int a = 1; a <= 5; a++) {
+ for (int b = 1; b <= 20; b++) {
+ rows[idx++] = GenericRow.of(a, b, a * 100 + b);
+ }
+ }
+ write(table, rows);
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+ // ---- build predicate: (a < 3 AND b = 5) OR (a = 3 AND b = 7) ----
+ Predicate predicate1 =
+ PredicateBuilder.or(
+ PredicateBuilder.and(pb.lessThan(0, 3), pb.equal(1,
5)),
+ PredicateBuilder.and(pb.equal(0, 3), pb.equal(1, 7)));
+ assertThat(plan(table, predicate1)).containsExactlyInAnyOrder("3,1",
"1,6", "2,6");
+
+ // ---- build predicate: (a < 3 AND b = 5) OR (a = 3 AND b < 100) ----
+ Predicate predicate2 =
+ PredicateBuilder.or(
+ PredicateBuilder.and(pb.lessThan(0, 3), pb.equal(1,
5)),
+ PredicateBuilder.and(pb.equal(0, 3), pb.lessThan(1,
100)));
+ assertThat(plan(table, predicate2))
+ .containsExactlyInAnyOrder(
+ "3,0", "3,1", "1,6", "3,4", "3,5", "2,6", "3,6",
"3,7", "3,8");
+
+ // ---- build predicate: (a = 2 AND b = 5) OR (a = 3 AND b = 7) ----
+ Predicate predicate3 =
+ PredicateBuilder.or(
+ PredicateBuilder.and(pb.equal(0, 2), pb.equal(1, 5)),
+ PredicateBuilder.and(pb.equal(0, 3), pb.equal(1, 7)));
+ assertThat(plan(table, predicate3)).containsExactlyInAnyOrder("3,1",
"2,6");
+ }
+
+ /**
+ * Tests bucket filtering with compound predicates on a composite
(multi-field) bucket key.
+ *
+ * <p>Table schema:
+ *
+ * <ul>
+ * <li>Partition key: column 'a' (INT)
+ * <li>Bucket key: columns 'b' and 'c' (composite, INT)
+ * <li>Bucket count: 10
+ * </ul>
+ *
+ * <p>Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20)
× 10 c-values (c=0 to
+ * 9) = 1000 rows.
+ *
+ * <p>Test scenarios:
+ *
+ * <ol>
+ * <li>Predicate: ((a < 3 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5
- Tests nested OR
+ * within AND, with partition range, bucket field equality, and
additional bucket field
+ * filter. The 'c = 5' condition is part of the composite bucket
key, affecting bucket
+ * selection.
+ * <li>Predicate: ((a < 3 AND b = 5) OR (a = 3 AND b < 100)) AND c
= 5 - Tests range
+ * predicate on one bucket field (b) combined with equality on
another (c). Validates
+ * handling of multiple bucket key fields with different predicate
types.
+ * <li>Predicate: ((a = 2 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 -
Tests exact matching on
+ * both partition and bucket fields. The composite bucket key (b,c)
ensures precise bucket
+ * targeting.
+ * </ol>
+ */
+ private void testCompositeBucketFilterWithCompoundPredicate(boolean pk)
throws Exception {
+ // ---- schema & table ----
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .column("c", DataTypes.INT())
+ .partitionKeys("a")
+ .option(BUCKET.key(), "10");
+ if (pk) {
+ builder.primaryKey("a", "b", "c");
+ } else {
+ builder.option(BUCKET_KEY.key(), "b,c");
+ }
+ Schema schema = builder.build();
+
+ Identifier tableId = identifier("test_composite_bucket_filter");
+ catalog.createTable(tableId, schema, false);
+ Table table = catalog.getTable(tableId);
+
+ // ---- write data: 5 partitions × 20 b-values x 10 c-values = 1000
rows ----
+ GenericRow[] rows = new GenericRow[1000];
+ int idx = 0;
+ for (int a = 1; a <= 5; a++) {
+ for (int b = 1; b <= 20; b++) {
+ for (int c = 0; c < 10; c++) {
+ rows[idx++] = GenericRow.of(a, b, c);
+ }
+ }
+ }
+ write(table, rows);
+ PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+ // ---- build predicate: ((a < 3 AND b = 5) OR (a = 3 AND b = 7)) AND
c = 5 ----
+ Predicate predicate1 =
+ PredicateBuilder.and(
+ PredicateBuilder.or(
+ PredicateBuilder.and(pb.lessThan(0, 3),
pb.equal(1, 5)),
+ PredicateBuilder.and(pb.equal(0, 3),
pb.equal(1, 7))),
+ pb.equal(2, 5));
+ assertThat(plan(table, predicate1)).containsExactlyInAnyOrder("1,0",
"2,0", "3,5");
+
+ // ---- build predicate: ((a < 3 AND b = 5) OR (a = 3 AND b < 100))
AND c = 5 ----
+ Predicate predicate2 =
+ PredicateBuilder.and(
+ PredicateBuilder.or(
+ PredicateBuilder.and(pb.lessThan(0, 3),
pb.equal(1, 5)),
+ PredicateBuilder.and(pb.equal(0, 3),
pb.lessThan(1, 100))),
+ pb.equal(2, 5));
+ assertThat(plan(table, predicate2))
+ .containsExactlyInAnyOrder(
+ "3,9", "1,0", "2,0", "3,0", "3,1", "3,2", "3,3",
"3,4", "3,5", "3,6", "3,7",
+ "3,8");
+
+ // ---- build predicate: ((a = 2 AND b = 5) OR (a = 3 AND b = 7)) AND
c = 5 ----
+ Predicate predicate3 =
+ PredicateBuilder.and(
+ PredicateBuilder.or(
+ PredicateBuilder.and(pb.equal(0, 2),
pb.equal(1, 5)),
+ PredicateBuilder.and(pb.equal(0, 3),
pb.equal(1, 7))),
+ pb.equal(2, 5));
+ assertThat(plan(table, predicate3)).containsExactlyInAnyOrder("2,0",
"3,5");
+ }
+
+ private Set<String> plan(Table table, Predicate predicate) {
+ return
table.newReadBuilder().withFilter(predicate).newScan().plan().splits().stream()
+ .map(
+ split -> {
+ DataSplit dataSplit = (DataSplit) split;
+ int partitionA = dataSplit.partition().getInt(0);
+ int bucket = dataSplit.bucket();
+ return partitionA + "," + bucket;
+ })
+ .collect(Collectors.toSet());
+ }
+}