JingsongLi commented on code in PR #280:
URL: https://github.com/apache/flink-table-store/pull/280#discussion_r959231586


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java:
##########
@@ -87,34 +108,78 @@ public KeyValueFileStoreRead withValueProjection(int[][] 
projectedFields) {
 
     @Override
     public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
-        this.filters = splitAnd(predicate);
+        allFilters = new ArrayList<>();
+        List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();
+        Set<String> nonPrimaryKeys =
+                tableSchema.fieldNames().stream()
+                        .filter(name -> !primaryKeys.contains(name))
+                        .collect(Collectors.toSet());
+        for (Predicate sub : splitAnd(predicate)) {
+            allFilters.add(sub);
+            if (!containsFields(sub, nonPrimaryKeys)) {
+                if (keyFilters == null) {
+                    keyFilters = new ArrayList<>();
+                }
+                // TODO Actually, the index is wrong, but it is OK.
+                //  The orc filter just use name instead of index.
+                keyFilters.add(sub);
+            }
+        }
         return this;
     }
 
     @Override
     public RecordReader<KeyValue> createReader(Split split) throws IOException 
{
         if (split.isIncremental()) {
-            DataFileReader dataFileReader =
-                    dataFileReaderFactory.create(split.partition(), 
split.bucket(), true, filters);
+            // incremental mode cannot push down value filters, because the 
update for the same key
+            // may occur in the next split
+            DataFileReader dataFileReader = createDataFileReader(split, true, 
false);
             // Return the raw file contents without merging
             List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new 
ArrayList<>();
             for (DataFileMeta file : split.files()) {
-                suppliers.add(
-                        () -> 
dataFileReader.read(changelogFile(file).orElse(file.fileName())));
+                if (acceptFilter(false).test(file)) {
+                    suppliers.add(
+                            () -> 
dataFileReader.read(changelogFile(file).orElse(file.fileName())));
+                }
             }
             return ConcatRecordReader.create(suppliers);
         } else {
             // in this case merge tree should merge records with same key
             // Do not project key in MergeTreeReader.
-            DataFileReader dataFileReader =
-                    dataFileReaderFactory.create(split.partition(), 
split.bucket(), false, filters);
-            MergeTreeReader reader =
-                    new MergeTreeReader(
-                            new IntervalPartition(split.files(), 
keyComparator).partition(),
-                            true,
-                            dataFileReader,
-                            keyComparator,
-                            mergeFunction.copy());
+            List<List<SortedRun>> sections =
+                    new IntervalPartition(split.files(), 
keyComparator).partition();
+            DataFileReader dataFileReaderWithAllFilters = 
createDataFileReader(split, false, true);
+            DataFileReader dataFileReaderWithKeyFilters = 
createDataFileReader(split, false, false);
+            MergeFunction mergeFunc = mergeFunction.copy();
+            List<ConcatRecordReader.ReaderSupplier<KeyValue>> readers = new 
ArrayList<>();
+            for (List<SortedRun> section : sections) {
+                // if key ranges do not have overlap, value filter can be 
pushed down as well
+                boolean acceptAll = section.size() == 1;
+                List<SortedRun> hitSection = new ArrayList<>();
+                for (SortedRun run : section) {
+                    List<DataFileMeta> hitFiles = new ArrayList<>();
+                    for (DataFileMeta file : run.files()) {

Review Comment:
   Maybe use `Stream.filter` is better to read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to