soin08 commented on code in PR #28566:
URL: https://github.com/apache/flink/pull/28566#discussion_r3497393797
##########
docs/content/docs/libs/state_processor_api.md:
##########
@@ -226,9 +226,113 @@ Along with reading registered state values, each key has
access to a `Context` w
**Note:** When using a `KeyedStateReaderFunction`, all state descriptors must
be registered eagerly inside of open. Any attempt to call a
`RuntimeContext#get*State` will result in a `RuntimeException`.
+##### Filtering Keys
+
+When you are only interested in state for a subset of keys, you can push a
`SavepointKeyFilter` into the read instead of reading the whole keyed state and
filtering afterward.
+
+Depending on its type, a filter skips unnecessary reads at two levels:
+
+* **Split pruning** — when the filter resolves to an exact set of keys
(`SavepointKeyFilter.exact(...)`), Flink hashes those keys up front to find the
few key groups that own them, and skips every other input split without ever
opening it. This is the largest saving, but it only works for exact keys.
+* **Key-level filtering** — within each split, every key is tested and
non-matching keys are skipped before their state is read. This applies to all
filters; a range filter relies on this level alone, since it cannot prune
splits.
+
+```java
+import org.apache.flink.state.api.filter.SavepointKeyFilter;
+
+// Read the state for a single key
+DataStream<KeyedState> singleKey = savepoint.readKeyedState(
+ OperatorIdentifier.forUid("my-uid"),
+ new ReaderFunction(),
+ Types.INT,
+ TypeInformation.of(KeyedState.class),
+ SavepointKeyFilter.exact(42));
+
+// Read the state for a finite set of keys
+DataStream<KeyedState> someKeys = savepoint.readKeyedState(
+ OperatorIdentifier.forUid("my-uid"),
+ new ReaderFunction(),
+ Types.INT,
+ TypeInformation.of(KeyedState.class),
+ SavepointKeyFilter.exact(Set.of(1, 2, 3)));
+
+// Read the state for a key range: 10 <= key < 100
+DataStream<KeyedState> keyRange = savepoint.readKeyedState(
+ OperatorIdentifier.forUid("my-uid"),
+ new ReaderFunction(),
+ Types.INT,
+ TypeInformation.of(KeyedState.class),
+ SavepointKeyFilter.range(10, true, 100, false));
+```
+
+`SavepointKeyFilter` provides the following factory methods:
+
+* `SavepointKeyFilter.exact(Object key)` /
`SavepointKeyFilter.exact(Set<Object> keys)` — match a single key or a finite
set of keys.
+* `SavepointKeyFilter.range(Comparable<?> lower, boolean lowerInclusive,
Comparable<?> upper, boolean upperInclusive)` — match a range. Either bound may
be `null` to leave that side unbounded.
+* `SavepointKeyFilter.empty()` — match no keys. Not intended for direct use —
it only serves as an internal building block for the Table API filter pushdown.
+
+When the built-in filters are not enough, you can implement the
`SavepointKeyFilter` interface yourself.
+For use with the DataStream API, only `test(Object key)` has to be
implemented; it is called for every key in each opened split and decides
whether that key will be read.
+
+```java
+// Reads the state only for even keys
+public class EvenKeyFilter implements SavepointKeyFilter {
+
+ @Override
+ public boolean test(Object key) {
+ return ((Integer) key) % 2 == 0;
+ }
+}
+
+DataStream<KeyedState> evenState = savepoint.readKeyedState(
+ OperatorIdentifier.forUid("my-uid"),
+ new ReaderFunction(),
+ Types.INT,
+ TypeInformation.of(KeyedState.class),
+ new EvenKeyFilter());
+```
+
+By default, a custom filter only enables key-level filtering — every split is
still opened.
+If your filter resolves to a finite set of keys, additionally override
`getExactKeys()` to return them; Flink then prunes the splits whose key groups
cannot contain any of those keys, just like the built-in `exact(...)` filter.
+
+```java
+// Reads the state only for keys 0..max
+public class UpToKeyFilter implements SavepointKeyFilter {
+
+ private final int max;
+
+ public UpToKeyFilter(int max) {
+ this.max = max;
+ }
+
+ @Override
+ public boolean test(Object key) {
+ int k = (Integer) key;
+ return k >= 0 && k <= max;
+ }
+
+ @Override
+ public Set<Object> getExactKeys() {
+ // Enumerate the finite key set so Flink can prune splits up front
+ Set<Object> keys = new HashSet<>();
+ for (int k = 0; k <= max; k++) {
+ keys.add(k);
+ }
+ return keys;
+ }
+}
+
+DataStream<KeyedState> firstKeys = savepoint.readKeyedState(
+ OperatorIdentifier.forUid("my-uid"),
+ new ReaderFunction(),
+ Types.INT,
+ TypeInformation.of(KeyedState.class),
+ new UpToKeyFilter(100));
+```
+
+The remaining interface methods can be left at their defaults for DataStream
API usage, as they are only used internally in the Table API during push-down
handling.
Review Comment:
>If you plan to use a custom filter with the Table API
I think this statement may confuse users, since there is no way for a user
to use a custom filter with Table API
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]