gaborgsomogyi commented on code in PR #28142:
URL: https://github.com/apache/flink/pull/28142#discussion_r3258159959


##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:
##########
@@ -131,10 +161,36 @@ public BaseStatistics getStatistics(BaseStatistics 
cachedStatistics) {
 
     @Override
     public KeyGroupRangeInputSplit[] createInputSplits(int minNumSplits) 
throws IOException {
+        if (keyFilter instanceof SavepointKeyFilter.EmptyKeyFilter) {
+            return new KeyGroupRangeInputSplit[0];
+        }
+
         final int maxParallelism = operatorState.getMaxParallelism();
 
         final List<KeyGroupRange> keyGroups = 
sortedKeyGroupRanges(minNumSplits, maxParallelism);
 
+        // Key-group pruning is only possible for exact key equality filters.
+        // For each key we compute its key-group, then emit only the splits
+        // whose KeyGroupRange contains at least one matching key-group.
+        if (keyFilter instanceof SavepointKeyFilter.ExactKeyFilter) {
+            final Set<Object> exactKeys = ((SavepointKeyFilter.ExactKeyFilter) 
keyFilter).getKeys();
+
+            final Set<Integer> targetKeyGroups = new HashSet<>();
+            for (Object key : exactKeys) {
+                
targetKeyGroups.add(KeyGroupRangeAssignment.assignToKeyGroup(key, 
maxParallelism));
+            }
+
+            final List<KeyGroupRangeInputSplit> prunedSplits = new 
ArrayList<>();
+            for (int i = 0; i < keyGroups.size(); i++) {
+                KeyGroupRange range = keyGroups.get(i);
+                if (rangeContainsAny(range, targetKeyGroups)) {
+                    prunedSplits.add(
+                            createKeyGroupRangeInputSplit(operatorState, 
maxParallelism, range, i));
+                }
+            }
+            return prunedSplits.toArray(new KeyGroupRangeInputSplit[0]);
+        }

Review Comment:
   `createInputSplits` is deciding whether split pruning is possible by 
inspecting the concrete type of the filter — that is filter knowledge, not 
input format knowledge. Every new prunable filter type forces a new branch here.
   
   The decision belongs on the filter itself:
   
   ```java
   // SavepointKeyFilter — new default method
   @Nullable default Set<Object> getExactKeys() { return null; }
   // ExactKeyFilter overrides to return its keys
   // EmptyKeyFilter overrides to return Collections.emptySet()
   // RangeKeyFilter uses the default (null = no split pruning possible)
   ```
   
   Then `createInputSplits` becomes type-blind and the `EmptyKeyFilter` special 
case disappears — an empty key set naturally produces zero target key groups, 
so no splits pass the range check:
   
   ```java
   @Override
   public KeyGroupRangeInputSplit[] createInputSplits(int minNumSplits) throws 
IOException {
       final int maxParallelism = operatorState.getMaxParallelism();
       final List<KeyGroupRange> keyGroups = sortedKeyGroupRanges(minNumSplits, 
maxParallelism);
   
       if (keyFilter != null) {
           Set<Object> exactKeys = keyFilter.getExactKeys();
           if (exactKeys != null) {
               return pruneByExactKeys(keyGroups, exactKeys, maxParallelism);
           }
       }
   
       return CollectionUtil.mapWithIndex(
                       keyGroups,
                       (keyGroupRange, index) ->
                               createKeyGroupRangeInputSplit(
                                       operatorState, maxParallelism, 
keyGroupRange, index))
               .toArray(KeyGroupRangeInputSplit[]::new);
   }
   ```
   
   Three `instanceof`-driven return paths collapse to two clean ones: exact-key 
pruning, or everything.



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:
##########
@@ -229,6 +289,48 @@ public OUT nextRecord(OUT reuse) throws IOException {
         return out.next();
     }
 
+    private static <K, N> CloseableIterator<Tuple2<K, N>> applyKeyFilter(
+            CloseableIterator<Tuple2<K, N>> source, SavepointKeyFilter filter) 
{
+        return new CloseableIterator<Tuple2<K, N>>() {
+            @Nullable private Tuple2<K, N> pending = null;
+
+            @Override
+            public boolean hasNext() {
+                while (pending == null && source.hasNext()) {
+                    Tuple2<K, N> candidate = source.next();
+                    if (filter.test(candidate.f0)) {
+                        pending = candidate;
+                    }
+                }
+                return pending != null;
+            }
+
+            @Override
+            public Tuple2<K, N> next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+                Tuple2<K, N> result = pending;
+                pending = null;
+                return result;
+            }
+
+            @Override
+            public void close() throws Exception {
+                source.close();
+            }
+        };
+    }

Review Comment:
   `KeyedStateInputFormat` should express that filtering must happen, not how. 
This anonymous filtering iterator is a standalone concern that does not belong 
here.
   
   A search of the codebase confirms there is no existing `CloseableIterator` 
filter utility in Flink — `CloseableIterator` itself only has 
`adapterForIterator`, `flatten`, `empty`, and `fromList` factory methods. This 
pattern should be introduced as a dedicated class, for example:
   
   ```java
   // flink-state-processing-api, input package
   class FilteringCloseableIterator<T> implements CloseableIterator<T> {
       private final CloseableIterator<T> source;
       private final Predicate<T> predicate;
       // ...
   }
   ```
   
   With its own unit test class covering: predicate match, predicate skip, 
empty source, close delegation, and `hasNext` called multiple times. The call 
site in `open()` then just reads:
   
   ```java
   keysAndNamespaces = new FilteringCloseableIterator<>(
           operator.getKeysAndNamespaces(ctx),
           tuple -> keyFilter.test(tuple.f0));
   ```



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSource.java:
##########
@@ -57,13 +62,44 @@ public SavepointDynamicTableSource(
 
     @Override
     public DynamicTableSource copy() {
-        return new SavepointDynamicTableSource(
-                stateBackendType,
-                statePath,
-                operatorIdentifier,
-                keyTypeInfo,
-                keyValueProjections,
-                rowType);
+        SavepointDynamicTableSource copy =
+                new SavepointDynamicTableSource(
+                        stateBackendType,
+                        statePath,
+                        operatorIdentifier,
+                        keyTypeInfo,
+                        keyValueProjections,
+                        rowType);
+        copy.keyFilter = this.keyFilter;
+        return copy;
+    }
+
+    @Override
+    public Result applyFilters(List<ResolvedExpression> filters) {
+        final int keyColumnIndex = keyValueProjections.f0;
+
+        // TODO: support composite key types in filter pushdown. Such keys 
will be passed as Rows
+        if (rowType.getFields().get(keyColumnIndex).getType() instanceof 
RowType) {
+            return Result.of(Collections.emptyList(), filters);
+        }
+
+        final List<ResolvedExpression> accepted = new ArrayList<>();
+        final List<ResolvedExpression> remaining = new ArrayList<>();
+
+        for (ResolvedExpression filter : filters) {
+            SavepointKeyFilter extracted = 
SavepointFilters.extractFilter(filter, keyColumnIndex);
+            if (extracted != null) {
+                keyFilter =
+                        (keyFilter == null)
+                                ? extracted
+                                : SavepointFilters.intersect(keyFilter, 
extracted);
+                accepted.add(filter);
+            } else {
+                remaining.add(filter);
+            }
+        }

Review Comment:
   `applyFilters` is responsible for negotiating the accepted/remaining split 
with the planner — that is its single job. The iteration, null guard, and 
`intersect` call are all details of *how* a key filter is built from 
expressions; they belong in `SavepointFilters`, not here.
   
   Suggested shape:
   
   ```java
   SavepointFilters.Result result = SavepointFilters.apply(filters, 
keyColumnIndex);
   keyFilter = result.keyFilter();
   return Result.of(result.accepted(), result.remaining());
   ```
   
   As a side effect, `intersect` becomes a private implementation detail of 
`SavepointFilters` rather than a package-visible method.



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointKeyFilter.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.table;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/** Represents a key filter that can be pushed down into a savepoint scan. */
+@Internal
+public interface SavepointKeyFilter extends Serializable {

Review Comment:
   This file carries the right idea but is in the wrong place, at the wrong 
granularity, and exposes too little behaviour through its interface — three 
connected problems.
   
   **1. Package placement.** `SavepointKeyFilter` lives in `state.table` but is 
already used by `SavepointReader` and `KeyedStateInputFormat`, both core state 
processing API classes. The table layer should depend on the state API, not the 
other way around. More importantly, a key filter is a general state processing 
primitive: once it lives in `state.table` it is invisible to users of the 
streaming State Processing API who never touch SQL. The entire filter 
infrastructure belongs in a dedicated package such as 
`org.apache.flink.state.api.filter`. The dependency arrows then become clean:
   
   ```
   state.api.filter   (no outward deps)
   state.api          → state.api.filter
   state.api.input    → state.api.filter
   state.table        → state.api.filter
   ```
   
   **2. Interface size.** The interface should be the API surface only. The 
three implementation classes (`EmptyKeyFilter`, `ExactKeyFilter`, 
`RangeKeyFilter`) and `BoundComparison` are the *how*; they should be 
package-private classes in separate files inside `state.api.filter`, invisible 
to callers.
   
   **3. Insufficient interface behaviour — causes 10 `instanceof` checks across 
the codebase.** The interface only declares `test()`, so every caller that 
needs type-specific behaviour is forced to inspect the concrete type. Every 
`instanceof SavepointKeyFilter.X` is a place where the interface should have 
had a method:
   
   ```java
   // eliminates 3× instanceof EmptyKeyFilter
   default boolean isEmpty() { return false; }
   
   // eliminates 4× instanceof ExactKeyFilter (split pruning + intersect)
   @Nullable default Set<Object> getExactKeys() { return null; }
   
   // eliminates the 4-check dispatch matrix in SavepointFilters.intersect()
   SavepointKeyFilter intersect(SavepointKeyFilter other);
   ```
   
   With `intersect` on the interface each implementation encapsulates its own 
combination logic. `fromAnd` in `SavepointFilters` can call `intersect` 
directly without checking types at all. The 10 external `instanceof` checks 
collapse to zero.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to