This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 95b461d5887b fix: Fix predicates for base file reader in Flink
FileGroup reader (#14197)
95b461d5887b is described below
commit 95b461d5887b4fa6c69846b6e56cb214006151b5
Author: Shuo Cheng <[email protected]>
AuthorDate: Tue Nov 4 09:48:23 2025 +0800
fix: Fix predicates for base file reader in Flink FileGroup reader (#14197)
---
.../apache/hudi/source/ExpressionPredicates.java | 31 ++++
.../table/format/FlinkRowDataReaderContext.java | 49 +++++-
.../hudi/source/TestExpressionPredicates.java | 165 +++++++++++++++++++++
.../apache/hudi/table/ITTestHoodieDataSource.java | 42 ++++++
4 files changed, 281 insertions(+), 6 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
index 641196a66b64..32ae005f0566 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/ExpressionPredicates.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -177,6 +178,11 @@ public class ExpressionPredicates {
* @return A filter predicate of parquet file.
*/
FilterPredicate filter();
+
+ /**
+ * List of columns that are referenced by this filter.
+ */
+ List<String> references();
}
/**
@@ -231,6 +237,11 @@ public class ExpressionPredicates {
return toParquetPredicate(getFunctionDefinition(), literalType,
columnName, convertedLiteral);
}
+ @Override
+ public List<String> references() {
+ return Collections.singletonList(columnName);
+ }
+
/**
* Returns function definition of predicate.
*
@@ -485,6 +496,11 @@ public class ExpressionPredicates {
public FilterPredicate filter() {
return null;
}
+
+ @Override
+ public List<String> references() {
+ return Collections.emptyList();
+ }
}
/**
@@ -523,6 +539,11 @@ public class ExpressionPredicates {
return not(filterPredicate);
}
+ @Override
+ public List<String> references() {
+ return predicate.references();
+ }
+
@Override
public String toString() {
return "NOT(" + predicate.toString() + ")";
@@ -566,6 +587,11 @@ public class ExpressionPredicates {
return and(filterPredicate0, filterPredicate1);
}
+ @Override
+ public List<String> references() {
+ return
Arrays.stream(predicates).map(Predicate::references).flatMap(List::stream).distinct().collect(Collectors.toList());
+ }
+
@Override
public String toString() {
return "AND(" + Arrays.toString(predicates) + ")";
@@ -609,6 +635,11 @@ public class ExpressionPredicates {
return or(filterPredicate0, filterPredicate1);
}
+ @Override
+ public List<String> references() {
+ return
Arrays.stream(predicates).map(Predicate::references).flatMap(List::stream).distinct().collect(Collectors.toList());
+ }
+
@Override
public String toString() {
return "OR(" + Arrays.toString(predicates) + ")";
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 3d2272d9d9fc..5e1f646104b7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.format;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.model.BootstrapRowData;
import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
@@ -43,6 +44,7 @@ import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.util.Lazy;
import org.apache.hudi.util.RowDataAvroQueryContexts;
import org.apache.hudi.util.RecordKeyToRowDataConverter;
@@ -54,32 +56,38 @@ import org.apache.flink.table.types.logical.RowType;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
+import static
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME;
/**
* Implementation of {@link HoodieReaderContext} to read {@link RowData}s from
base files or
* log files with Flink parquet reader.
*/
public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> {
- private final List<ExpressionPredicates.Predicate> predicates;
+ private final List<ExpressionPredicates.Predicate> allPredicates;
+ private final Lazy<List<ExpressionPredicates.Predicate>>
lazyBootstrapSafeFilters;
+ private final Lazy<List<ExpressionPredicates.Predicate>> lazyMorSafeFilters;
+ private final Lazy<List<String>> lazyRecordKeys;
private final Supplier<InternalSchemaManager> internalSchemaManager;
- private final HoodieTableConfig tableConfig;
public FlinkRowDataReaderContext(
StorageConfiguration<?> storageConfiguration,
Supplier<InternalSchemaManager> internalSchemaManager,
- List<ExpressionPredicates.Predicate> predicates,
+ List<ExpressionPredicates.Predicate> allPredicates,
HoodieTableConfig tableConfig,
Option<InstantRange> instantRangeOpt) {
super(storageConfiguration, tableConfig, instantRangeOpt, Option.empty(),
new FlinkRecordContext(tableConfig, storageConfiguration));
- this.tableConfig = tableConfig;
this.internalSchemaManager = internalSchemaManager;
- this.predicates = predicates;
+ this.allPredicates = allPredicates;
+ this.lazyBootstrapSafeFilters = Lazy.lazily(() ->
allPredicates.stream().filter(this::filterIsSafeForBootstrap).collect(Collectors.toList()));
+ this.lazyMorSafeFilters = Lazy.lazily(() ->
allPredicates.stream().filter(this::filterIsSafeForMorMerging).collect(Collectors.toList()));
+ this.lazyRecordKeys = Lazy.lazily(() ->
tableConfig.getRecordKeyFields().map(keys ->
Arrays.stream(keys).collect(Collectors.toList())).orElse(Collections.emptyList()));
}
@Override
@@ -99,7 +107,7 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
.getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
.getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET,
Option.empty());
DataType rowType =
RowDataAvroQueryContexts.fromAvroSchema(dataSchema).getRowType();
- return rowDataParquetReader.getRowDataIterator(schemaManager, rowType,
requiredSchema, predicates);
+ return rowDataParquetReader.getRowDataIterator(schemaManager, rowType,
requiredSchema, getSafePredicates(requiredSchema));
}
@Override
@@ -198,4 +206,33 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
pkFieldsPos, (RowType)
RowDataAvroQueryContexts.fromAvroSchema(requiredSchema).getRowType().getLogicalType());
((FlinkRecordContext)
recordContext).setRecordKeyRowConverter(recordKeyRowConverter);
}
+
+ private List<ExpressionPredicates.Predicate> getSafePredicates(Schema
requiredSchema) {
+ boolean hasRowIndexField =
AvroSchemaUtils.containsFieldInSchema(requiredSchema,
ROW_INDEX_TEMPORARY_COLUMN_NAME);
+ if (!getHasLogFiles() && !getNeedsBootstrapMerge()) {
+ return allPredicates;
+ } else if (!getHasLogFiles() && hasRowIndexField) {
+ return lazyBootstrapSafeFilters.get();
+ } else {
+ return lazyMorSafeFilters.get();
+ }
+ }
+
+ /**
+ * Only valid if there is support for RowIndexField and no log files
+ * Filters are safe for bootstrap if meta col filters are independent from
data col filters.
+ */
+ private boolean filterIsSafeForBootstrap(ExpressionPredicates.Predicate
predicate) {
+ long metaRefCount = predicate.references().stream().filter(c ->
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(c.toLowerCase())).count();
+ return metaRefCount == predicate.references().size() || metaRefCount == 0;
+ }
+
+ /**
+ * Only valid if the filter's references only include primary key columns or
{@link HoodieRecord#RECORD_KEY_METADATA_FIELD},
+ * because it's necessary to ensure both records with the same record key in
the base file and log file are either filtered out
+ * or retained, to make the later mering process correct.
+ */
+ private boolean filterIsSafeForMorMerging(ExpressionPredicates.Predicate
predicate) {
+ return predicate.references().stream().allMatch(c ->
lazyRecordKeys.get().contains(c.toLowerCase()) ||
c.equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
index 1ea00104b4d3..55698ae8c048 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionPredicates.java
@@ -69,6 +69,7 @@ import static
org.apache.parquet.filter2.predicate.FilterApi.or;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for {@link ExpressionPredicates}.
@@ -243,4 +244,168 @@ public class TestExpressionPredicates {
ExpressionPredicates.ColumnPredicate predicate =
Equals.getInstance().bindFieldReference(fieldReference).bindValueLiteral(valueLiteral);
assertDoesNotThrow(predicate::filter, () -> String.format("Convert from %s
to %s failed", literalValue.getClass().getName(), dataType));
}
+
+ @Test
+ public void testUnaryPredicateReferences() {
+ // Test Equals predicate references method
+ FieldReferenceExpression fieldReference = new
FieldReferenceExpression("field1", DataTypes.STRING(), 0, 0);
+ ValueLiteralExpression valueLiteral = new ValueLiteralExpression("value1");
+
+ ExpressionPredicates.ColumnPredicate equalsPredicate = Equals.getInstance()
+ .bindFieldReference(fieldReference)
+ .bindValueLiteral(valueLiteral);
+
+ assertEquals(Collections.singletonList("field1"),
equalsPredicate.references());
+
+ // Test GreaterThan predicate references method
+ valueLiteral = new ValueLiteralExpression(10);
+ ExpressionPredicates.ColumnPredicate gtPredicate =
GreaterThan.getInstance()
+ .bindFieldReference(fieldReference)
+ .bindValueLiteral(valueLiteral);
+ assertEquals(Collections.singletonList("field1"),
gtPredicate.references());
+
+ ValueLiteralExpression valueLiteral1 = new
ValueLiteralExpression("value1");
+ ValueLiteralExpression valueLiteral2 = new
ValueLiteralExpression("value2");
+
+ ExpressionPredicates.ColumnPredicate inPredicate = In.getInstance()
+ .bindValueLiterals(Arrays.asList(valueLiteral1, valueLiteral2))
+ .bindFieldReference(fieldReference);
+
+ assertEquals(Collections.singletonList("field1"),
inPredicate.references());
+ }
+
+ @Test
+ public void testNotPredicateReferences() {
+ // Test Not predicate references method - should delegate to underlying
predicate
+ FieldReferenceExpression fieldReference = new
FieldReferenceExpression("field4", DataTypes.STRING(), 0, 0);
+ ValueLiteralExpression valueLiteral = new ValueLiteralExpression("value1");
+
+ ExpressionPredicates.ColumnPredicate innerPredicate = Equals.getInstance()
+ .bindFieldReference(fieldReference)
+ .bindValueLiteral(valueLiteral);
+
+ Predicate notPredicate = Not.getInstance().bindPredicate(innerPredicate);
+
+ List<String> references = notPredicate.references();
+ assertEquals(1, references.size());
+ assertEquals("field4", references.get(0));
+ }
+
+ @Test
+ public void testAndPredicateReferencesWithSameColumn() {
+ // Test And predicate references method with same column
+ FieldReferenceExpression fieldReference1 = new
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+ FieldReferenceExpression fieldReference2 = new
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+ ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10);
+ ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression(20);
+
+ ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance()
+ .bindFieldReference(fieldReference1)
+ .bindValueLiteral(valueLiteral1);
+
+ ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance()
+ .bindFieldReference(fieldReference2)
+ .bindValueLiteral(valueLiteral2);
+
+ Predicate andPredicate = And.getInstance().bindPredicates(predicate1,
predicate2);
+
+ List<String> references = andPredicate.references();
+ assertEquals(1, references.size());
+ assertEquals("field1", references.get(0));
+ }
+
+ @Test
+ public void testAndPredicateReferencesWithDifferentColumns() {
+ // Test And predicate references method with different columns
+ FieldReferenceExpression fieldReference1 = new
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+ FieldReferenceExpression fieldReference2 = new
FieldReferenceExpression("field2", DataTypes.STRING(), 0, 0);
+ ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10);
+ ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression("test");
+
+ ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance()
+ .bindFieldReference(fieldReference1)
+ .bindValueLiteral(valueLiteral1);
+
+ ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance()
+ .bindFieldReference(fieldReference2)
+ .bindValueLiteral(valueLiteral2);
+
+ Predicate andPredicate = And.getInstance().bindPredicates(predicate1,
predicate2);
+
+ assertEquals(Arrays.asList("field1", "field2"), andPredicate.references());
+ }
+
+ @Test
+ public void testOrPredicateReferencesWithSameColumn() {
+ // Test Or predicate references method with same column
+ FieldReferenceExpression fieldReference1 = new
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+ FieldReferenceExpression fieldReference2 = new
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+ ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10);
+ ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression(20);
+
+ ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance()
+ .bindFieldReference(fieldReference1)
+ .bindValueLiteral(valueLiteral1);
+
+ ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance()
+ .bindFieldReference(fieldReference2)
+ .bindValueLiteral(valueLiteral2);
+
+ Predicate orPredicate = Or.getInstance().bindPredicates(predicate1,
predicate2);
+
+ List<String> references = orPredicate.references();
+ assertEquals(1, references.size());
+ assertEquals("field1", references.get(0));
+ }
+
+ @Test
+ public void testOrPredicateReferencesWithDifferentColumns() {
+ // Test Or predicate references method with different columns
+ FieldReferenceExpression fieldReference1 = new
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+ FieldReferenceExpression fieldReference2 = new
FieldReferenceExpression("field2", DataTypes.STRING(), 0, 0);
+ ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10);
+ ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression("test");
+
+ ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance()
+ .bindFieldReference(fieldReference1)
+ .bindValueLiteral(valueLiteral1);
+
+ ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance()
+ .bindFieldReference(fieldReference2)
+ .bindValueLiteral(valueLiteral2);
+
+ Predicate orPredicate = Or.getInstance().bindPredicates(predicate1,
predicate2);
+ assertEquals(Arrays.asList("field1", "field2"), orPredicate.references());
+ }
+
+ @Test
+ public void testAlwaysNullPredicateReferences() {
+ // Test AlwaysNull predicate references method - should return empty list
+ Predicate alwaysNullPredicate =
ExpressionPredicates.AlwaysNull.getInstance();
+
+ List<String> references = alwaysNullPredicate.references();
+ assertEquals(0, references.size());
+ assertTrue(references.isEmpty());
+ }
+
+ @Test
+ public void testNestedPredicateReferences() {
+ // Test nested predicates (AND inside NOT)
+ FieldReferenceExpression fieldReference1 = new
FieldReferenceExpression("field1", DataTypes.INT(), 0, 0);
+ FieldReferenceExpression fieldReference2 = new
FieldReferenceExpression("field2", DataTypes.STRING(), 0, 0);
+ ValueLiteralExpression valueLiteral1 = new ValueLiteralExpression(10);
+ ValueLiteralExpression valueLiteral2 = new ValueLiteralExpression("test");
+
+ ExpressionPredicates.ColumnPredicate predicate1 = GreaterThan.getInstance()
+ .bindFieldReference(fieldReference1)
+ .bindValueLiteral(valueLiteral1);
+
+ ExpressionPredicates.ColumnPredicate predicate2 = Equals.getInstance()
+ .bindFieldReference(fieldReference2)
+ .bindValueLiteral(valueLiteral2);
+
+ Predicate andPredicate = And.getInstance().bindPredicates(predicate1,
predicate2);
+ Predicate notPredicate = Not.getInstance().bindPredicate(andPredicate);
+ assertEquals(Arrays.asList("field1", "field2"), notPredicate.references());
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index b59b51c035c0..d8f14601ed85 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1975,6 +1975,48 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result1, "[+I[id2, Stephen, 33, 1970-01-01T00:00:02,
par1]]");
}
+ @Test
+ void testPredicateForBaseFileWithMor() {
+ // Case:
+ // * records in base file can not survive from the predicate
+ // * records in log file can survive from the predicate
+ // * records in base file have higher ordering value
+ // E.g., base file: (uuid:'k1', age: 23, ts: 1003)
+ // log file: (uuid: 'k1', age: 25, ts: 1001)
+ // query filter: age = 25;
+ // Then the expected result should be empty, but if predicate age = 25 is
pushed down
+ // into the parquet reader, the result would be wrong as (uuid: 'k1', age:
25, ts: 1001)
+ TableEnvironment tableEnv = batchTableEnv;
+ String path = tempFile.getAbsolutePath();
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, path)
+ .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
+ .option(FlinkOptions.COMPACTION_TASKS, 1)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ final String INSERT_T1 = "insert into t1 values\n"
+ + "('id1','Danny',23,TIMESTAMP '1970-01-01 01:00:01','par1')\n";
+ execInsertSql(tableEnv, INSERT_T1);
+
+ batchTableEnv.executeSql("drop table t1");
+
+ hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, path)
+ .option(FlinkOptions.METADATA_ENABLED, true)
+ .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ final String INSERT_T2 = "insert into t1 values\n"
+ + "('id1','Danny',25,TIMESTAMP '1970-01-01 00:00:01','par1')\n";
+ execInsertSql(tableEnv, INSERT_T2);
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1 where age = 25 and
`partition` = 'par1'").execute().collect());
+ assertRowsEquals(result1, "[]");
+ }
+
@Test
void testParquetLogBlockDataSkipping() {
TableEnvironment tableEnv = batchTableEnv;