gaborgsomogyi commented on code in PR #28142:
URL: https://github.com/apache/flink/pull/28142#discussion_r3258159959
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:
##########
@@ -131,10 +161,36 @@ public BaseStatistics getStatistics(BaseStatistics
cachedStatistics) {
@Override
public KeyGroupRangeInputSplit[] createInputSplits(int minNumSplits)
throws IOException {
+ if (keyFilter instanceof SavepointKeyFilter.EmptyKeyFilter) {
+ return new KeyGroupRangeInputSplit[0];
+ }
+
final int maxParallelism = operatorState.getMaxParallelism();
final List<KeyGroupRange> keyGroups =
sortedKeyGroupRanges(minNumSplits, maxParallelism);
+ // Key-group pruning is only possible for exact key equality filters.
+ // For each key we compute its key-group, then emit only the splits
+ // whose KeyGroupRange contains at least one matching key-group.
+ if (keyFilter instanceof SavepointKeyFilter.ExactKeyFilter) {
+ final Set<Object> exactKeys = ((SavepointKeyFilter.ExactKeyFilter)
keyFilter).getKeys();
+
+ final Set<Integer> targetKeyGroups = new HashSet<>();
+ for (Object key : exactKeys) {
+
targetKeyGroups.add(KeyGroupRangeAssignment.assignToKeyGroup(key,
maxParallelism));
+ }
+
+ final List<KeyGroupRangeInputSplit> prunedSplits = new
ArrayList<>();
+ for (int i = 0; i < keyGroups.size(); i++) {
+ KeyGroupRange range = keyGroups.get(i);
+ if (rangeContainsAny(range, targetKeyGroups)) {
+ prunedSplits.add(
+ createKeyGroupRangeInputSplit(operatorState,
maxParallelism, range, i));
+ }
+ }
+ return prunedSplits.toArray(new KeyGroupRangeInputSplit[0]);
+ }
Review Comment:
`createInputSplits` is deciding whether split pruning is possible by
inspecting the concrete type of the filter — that is filter knowledge, not
input format knowledge. Every new prunable filter type forces a new branch here.
The decision belongs on the filter itself:
```java
// SavepointKeyFilter — new default method
@Nullable default Set<Object> getExactKeys() { return null; }
// ExactKeyFilter overrides to return its keys
// EmptyKeyFilter overrides to return Collections.emptySet()
// RangeKeyFilter uses the default (null = no split pruning possible)
```
Then `createInputSplits` becomes type-blind and the `EmptyKeyFilter` special
case disappears — an empty key set naturally produces zero target key groups,
so no splits pass the range check:
```java
@Override
public KeyGroupRangeInputSplit[] createInputSplits(int minNumSplits) throws
IOException {
final int maxParallelism = operatorState.getMaxParallelism();
final List<KeyGroupRange> keyGroups = sortedKeyGroupRanges(minNumSplits,
maxParallelism);
if (keyFilter != null) {
Set<Object> exactKeys = keyFilter.getExactKeys();
if (exactKeys != null) {
return pruneByExactKeys(keyGroups, exactKeys, maxParallelism);
}
}
return CollectionUtil.mapWithIndex(
keyGroups,
(keyGroupRange, index) ->
createKeyGroupRangeInputSplit(
operatorState, maxParallelism,
keyGroupRange, index))
.toArray(KeyGroupRangeInputSplit[]::new);
}
```
Three `instanceof`-driven return paths collapse to two clean ones: exact-key
pruning, or everything.
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java:
##########
@@ -229,6 +289,48 @@ public OUT nextRecord(OUT reuse) throws IOException {
return out.next();
}
+ private static <K, N> CloseableIterator<Tuple2<K, N>> applyKeyFilter(
+ CloseableIterator<Tuple2<K, N>> source, SavepointKeyFilter filter)
{
+ return new CloseableIterator<Tuple2<K, N>>() {
+ @Nullable private Tuple2<K, N> pending = null;
+
+ @Override
+ public boolean hasNext() {
+ while (pending == null && source.hasNext()) {
+ Tuple2<K, N> candidate = source.next();
+ if (filter.test(candidate.f0)) {
+ pending = candidate;
+ }
+ }
+ return pending != null;
+ }
+
+ @Override
+ public Tuple2<K, N> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ Tuple2<K, N> result = pending;
+ pending = null;
+ return result;
+ }
+
+ @Override
+ public void close() throws Exception {
+ source.close();
+ }
+ };
+ }
Review Comment:
`KeyedStateInputFormat` should express that filtering must happen, not how.
This anonymous filtering iterator is a standalone concern that does not belong
here.
A search of the codebase confirms there is no existing `CloseableIterator`
filter utility in Flink — `CloseableIterator` itself only has
`adapterForIterator`, `flatten`, `empty`, and `fromList` factory methods. This
pattern should be introduced as a dedicated class, for example:
```java
// flink-state-processing-api, input package
class FilteringCloseableIterator<T> implements CloseableIterator<T> {
private final CloseableIterator<T> source;
private final Predicate<T> predicate;
// ...
}
```
With its own unit test class covering: predicate match, predicate skip,
empty source, close delegation, and `hasNext` called multiple times. The call
site in `open()` then just reads:
```java
keysAndNamespaces = new FilteringCloseableIterator<>(
operator.getKeysAndNamespaces(ctx),
tuple -> keyFilter.test(tuple.f0));
```
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSource.java:
##########
@@ -57,13 +62,44 @@ public SavepointDynamicTableSource(
@Override
public DynamicTableSource copy() {
- return new SavepointDynamicTableSource(
- stateBackendType,
- statePath,
- operatorIdentifier,
- keyTypeInfo,
- keyValueProjections,
- rowType);
+ SavepointDynamicTableSource copy =
+ new SavepointDynamicTableSource(
+ stateBackendType,
+ statePath,
+ operatorIdentifier,
+ keyTypeInfo,
+ keyValueProjections,
+ rowType);
+ copy.keyFilter = this.keyFilter;
+ return copy;
+ }
+
+ @Override
+ public Result applyFilters(List<ResolvedExpression> filters) {
+ final int keyColumnIndex = keyValueProjections.f0;
+
+ // TODO: support composite key types in filter pushdown. Such keys
will be passed as Rows
+ if (rowType.getFields().get(keyColumnIndex).getType() instanceof
RowType) {
+ return Result.of(Collections.emptyList(), filters);
+ }
+
+ final List<ResolvedExpression> accepted = new ArrayList<>();
+ final List<ResolvedExpression> remaining = new ArrayList<>();
+
+ for (ResolvedExpression filter : filters) {
+ SavepointKeyFilter extracted =
SavepointFilters.extractFilter(filter, keyColumnIndex);
+ if (extracted != null) {
+ keyFilter =
+ (keyFilter == null)
+ ? extracted
+ : SavepointFilters.intersect(keyFilter,
extracted);
+ accepted.add(filter);
+ } else {
+ remaining.add(filter);
+ }
+ }
Review Comment:
`applyFilters` is responsible for negotiating the accepted/remaining split
with the planner — that is its single job. The iteration, null guard, and
`intersect` call are all details of *how* a key filter is built from
expressions; they belong in `SavepointFilters`, not here.
Suggested shape:
```java
SavepointFilters.Result result = SavepointFilters.apply(filters,
keyColumnIndex);
keyFilter = result.keyFilter();
return Result.of(result.accepted(), result.remaining());
```
As a side effect, `intersect` becomes a private implementation detail of
`SavepointFilters` rather than a package-visible method.
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointKeyFilter.java:
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.table;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/** Represents a key filter that can be pushed down into a savepoint scan. */
+@Internal
+public interface SavepointKeyFilter extends Serializable {
Review Comment:
This file carries the right idea but is in the wrong place, at the wrong
granularity, and exposes too little behaviour through its interface — three
connected problems.
**1. Package placement.** `SavepointKeyFilter` lives in `state.table` but is
already used by `SavepointReader` and `KeyedStateInputFormat`, both core state
processing API classes. The table layer should depend on the state API, not the
other way around. More importantly, a key filter is a general state processing
primitive: once it lives in `state.table` it is invisible to users of the
streaming State Processing API who never touch SQL. The entire filter
infrastructure belongs in a dedicated package such as
`org.apache.flink.state.api.filter`. The dependency arrows then become clean:
```
state.api.filter (no outward deps)
state.api → state.api.filter
state.api.input → state.api.filter
state.table → state.api.filter
```
**2. Interface size.** The interface should be the API surface only. The
three implementation classes (`EmptyKeyFilter`, `ExactKeyFilter`,
`RangeKeyFilter`) and `BoundComparison` are the *how*; they should be
package-private classes in separate files inside `state.api.filter`, invisible
to callers.
**3. Insufficient interface behaviour — causes 10 `instanceof` checks across
the codebase.** The interface only declares `test()`, so every caller that
needs type-specific behaviour is forced to inspect the concrete type. Every
`instanceof SavepointKeyFilter.X` is a place where the interface should have
had a method:
```java
// eliminates 3× instanceof EmptyKeyFilter
default boolean isEmpty() { return false; }
// eliminates 4× instanceof ExactKeyFilter (split pruning + intersect)
@Nullable default Set<Object> getExactKeys() { return null; }
// eliminates the 4-check dispatch matrix in SavepointFilters.intersect()
SavepointKeyFilter intersect(SavepointKeyFilter other);
```
With `intersect` on the interface each implementation encapsulates its own
combination logic. `fromAnd` in `SavepointFilters` can call `intersect`
directly without checking types at all. The 10 external `instanceof` checks
collapse to zero.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]