JingsongLi commented on code in PR #280: URL: https://github.com/apache/flink-table-store/pull/280#discussion_r959231586
########## flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java: ########## @@ -87,34 +108,78 @@ public KeyValueFileStoreRead withValueProjection(int[][] projectedFields) { @Override public FileStoreRead<KeyValue> withFilter(Predicate predicate) { - this.filters = splitAnd(predicate); + allFilters = new ArrayList<>(); + List<String> primaryKeys = tableSchema.trimmedPrimaryKeys(); + Set<String> nonPrimaryKeys = + tableSchema.fieldNames().stream() + .filter(name -> !primaryKeys.contains(name)) + .collect(Collectors.toSet()); + for (Predicate sub : splitAnd(predicate)) { + allFilters.add(sub); + if (!containsFields(sub, nonPrimaryKeys)) { + if (keyFilters == null) { + keyFilters = new ArrayList<>(); + } + // TODO Actually, the index is wrong, but it is OK. + // The orc filter just use name instead of index. + keyFilters.add(sub); + } + } return this; } @Override public RecordReader<KeyValue> createReader(Split split) throws IOException { if (split.isIncremental()) { - DataFileReader dataFileReader = - dataFileReaderFactory.create(split.partition(), split.bucket(), true, filters); + // incremental mode cannot push down value filters, because the update for the same key + // may occur in the next split + DataFileReader dataFileReader = createDataFileReader(split, true, false); // Return the raw file contents without merging List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new ArrayList<>(); for (DataFileMeta file : split.files()) { - suppliers.add( - () -> dataFileReader.read(changelogFile(file).orElse(file.fileName()))); + if (acceptFilter(false).test(file)) { + suppliers.add( + () -> dataFileReader.read(changelogFile(file).orElse(file.fileName()))); + } } return ConcatRecordReader.create(suppliers); } else { // in this case merge tree should merge records with same key // Do not project key in MergeTreeReader. - DataFileReader dataFileReader = - dataFileReaderFactory.create(split.partition(), split.bucket(), false, filters); - MergeTreeReader reader = - new MergeTreeReader( - new IntervalPartition(split.files(), keyComparator).partition(), - true, - dataFileReader, - keyComparator, - mergeFunction.copy()); + List<List<SortedRun>> sections = + new IntervalPartition(split.files(), keyComparator).partition(); + DataFileReader dataFileReaderWithAllFilters = createDataFileReader(split, false, true); + DataFileReader dataFileReaderWithKeyFilters = createDataFileReader(split, false, false); + MergeFunction mergeFunc = mergeFunction.copy(); + List<ConcatRecordReader.ReaderSupplier<KeyValue>> readers = new ArrayList<>(); + for (List<SortedRun> section : sections) { + // if key ranges do not have overlap, value filter can be pushed down as well + boolean acceptAll = section.size() == 1; + List<SortedRun> hitSection = new ArrayList<>(); + for (SortedRun run : section) { + List<DataFileMeta> hitFiles = new ArrayList<>(); + for (DataFileMeta file : run.files()) { Review Comment: Maybe use `Stream.filter` is better to read. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org