soin08 commented on code in PR #28590:
URL: https://github.com/apache/flink/pull/28590#discussion_r3500154812
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/filter/SavepointKeyFilter.java:
##########
@@ -79,43 +83,44 @@ default BoundInfo getUpperBound() {
*
* <p>Used only while combining filters during push-down translation, not
during the scan.
*/
- default SavepointKeyFilter intersect(SavepointKeyFilter other) {
+ default SavepointKeyFilter<K> intersect(SavepointKeyFilter<K> other) {
throw new UnsupportedOperationException(
getClass().getSimpleName() + " does not support intersect()");
}
- static SavepointKeyFilter filterKeys(Set<Object> keys, SavepointKeyFilter
predicate) {
- final Set<Object> retained = new HashSet<>();
- for (Object key : keys) {
+ static <K> SavepointKeyFilter<K> filterKeys(Set<K> keys,
SavepointKeyFilter<K> predicate) {
+ final Set<K> retained = new HashSet<>();
+ for (K key : keys) {
if (predicate.test(key)) {
retained.add(key);
}
}
return exact(retained);
}
- static SavepointKeyFilter exact(Set<Object> keys) {
+ static <K> SavepointKeyFilter<K> exact(Set<K> keys) {
if (keys.isEmpty()) {
- return EmptyKeyFilter.INSTANCE;
+ return EmptyKeyFilter.instance();
}
- return new ExactKeyFilter(keys);
+ return new ExactKeyFilter<>(keys);
}
- static SavepointKeyFilter exact(Object value) {
- return new ExactKeyFilter(Set.of(value));
+ static <K> SavepointKeyFilter<K> exact(K value) {
+ return new ExactKeyFilter<>(Set.of(value));
}
- static SavepointKeyFilter range(
+ static <K> SavepointKeyFilter<K> range(
@Nullable Comparable<?> lower,
boolean lowerInclusive,
@Nullable Comparable<?> upper,
boolean upperInclusive) {
BoundInfo lowerBoundInfo = lower != null ? new BoundInfo(lower,
lowerInclusive) : null;
BoundInfo upperBoundInfo = upper != null ? new BoundInfo(upper,
upperInclusive) : null;
- return new RangeKeyFilter(lowerBoundInfo, upperBoundInfo);
+ return new RangeKeyFilter<>(lowerBoundInfo, upperBoundInfo);
}
- static SavepointKeyFilter empty() {
- return EmptyKeyFilter.INSTANCE;
+ @SuppressWarnings("unchecked")
Review Comment:
Redundant @SuppressWarnings("unchecked") — body just delegates to
instance(), which already owns the cast/suppression; empty() does no unchecked
op.
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/filter/SavepointKeyFilter.java:
##########
@@ -79,43 +83,44 @@ default BoundInfo getUpperBound() {
*
* <p>Used only while combining filters during push-down translation, not
during the scan.
*/
- default SavepointKeyFilter intersect(SavepointKeyFilter other) {
+ default SavepointKeyFilter<K> intersect(SavepointKeyFilter<K> other) {
throw new UnsupportedOperationException(
getClass().getSimpleName() + " does not support intersect()");
}
- static SavepointKeyFilter filterKeys(Set<Object> keys, SavepointKeyFilter
predicate) {
- final Set<Object> retained = new HashSet<>();
- for (Object key : keys) {
+ static <K> SavepointKeyFilter<K> filterKeys(Set<K> keys,
SavepointKeyFilter<K> predicate) {
+ final Set<K> retained = new HashSet<>();
+ for (K key : keys) {
if (predicate.test(key)) {
retained.add(key);
}
}
return exact(retained);
}
- static SavepointKeyFilter exact(Set<Object> keys) {
+ static <K> SavepointKeyFilter<K> exact(Set<K> keys) {
if (keys.isEmpty()) {
- return EmptyKeyFilter.INSTANCE;
+ return EmptyKeyFilter.instance();
}
- return new ExactKeyFilter(keys);
+ return new ExactKeyFilter<>(keys);
}
- static SavepointKeyFilter exact(Object value) {
- return new ExactKeyFilter(Set.of(value));
+ static <K> SavepointKeyFilter<K> exact(K value) {
+ return new ExactKeyFilter<>(Set.of(value));
}
- static SavepointKeyFilter range(
+ static <K> SavepointKeyFilter<K> range(
Review Comment:
`range(...)` advertises type-safety it doesn't enforce. It returns
`SavepointKeyFilter<K>` for an unconstrained `K`, and
`BoundInfo/RangeKeyFilter.compare()`
keep the unchecked `Comparable<Object>` cast. So `K` is phantom for ranges:
```
SavepointKeyFilter<String> f = SavepointKeyFilter.range(3, true, 7, true);
// compiles
// → ClassCastException at scan time
```
Suggested shape:
```
static <K extends Comparable<K>> SavepointKeyFilter<K> range(
@Nullable K lower,
boolean lowerInclusive,
@Nullable K upper,
boolean upperInclusive)
```
##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/filter/EmptyKeyFilter.java:
##########
@@ -22,16 +22,22 @@
import java.util.Set;
/** A filter that rejects every key. */
-final class EmptyKeyFilter implements SavepointKeyFilter {
+final class EmptyKeyFilter<K> implements SavepointKeyFilter<K> {
private static final long serialVersionUID = 1L;
- static final EmptyKeyFilter INSTANCE = new EmptyKeyFilter();
+ @SuppressWarnings("rawtypes")
+ static final EmptyKeyFilter INSTANCE = new EmptyKeyFilter<>();
Review Comment:
since now the singletone is accessed via `.instance()`, is there a reason to
leave `INSTANCE` not `private`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]