gaborgsomogyi commented on code in PR #28590:
URL: https://github.com/apache/flink/pull/28590#discussion_r3506850319


##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/filter/SavepointKeyFilter.java:
##########
@@ -79,43 +83,44 @@ default BoundInfo getUpperBound() {
      *
      * <p>Used only while combining filters during push-down translation, not 
during the scan.
      */
-    default SavepointKeyFilter intersect(SavepointKeyFilter other) {
+    default SavepointKeyFilter<K> intersect(SavepointKeyFilter<K> other) {
         throw new UnsupportedOperationException(
                 getClass().getSimpleName() + " does not support intersect()");
     }
 
-    static SavepointKeyFilter filterKeys(Set<Object> keys, SavepointKeyFilter 
predicate) {
-        final Set<Object> retained = new HashSet<>();
-        for (Object key : keys) {
+    static <K> SavepointKeyFilter<K> filterKeys(Set<K> keys, 
SavepointKeyFilter<K> predicate) {
+        final Set<K> retained = new HashSet<>();
+        for (K key : keys) {
             if (predicate.test(key)) {
                 retained.add(key);
             }
         }
         return exact(retained);
     }
 
-    static SavepointKeyFilter exact(Set<Object> keys) {
+    static <K> SavepointKeyFilter<K> exact(Set<K> keys) {
         if (keys.isEmpty()) {
-            return EmptyKeyFilter.INSTANCE;
+            return EmptyKeyFilter.instance();
         }
-        return new ExactKeyFilter(keys);
+        return new ExactKeyFilter<>(keys);
     }
 
-    static SavepointKeyFilter exact(Object value) {
-        return new ExactKeyFilter(Set.of(value));
+    static <K> SavepointKeyFilter<K> exact(K value) {
+        return new ExactKeyFilter<>(Set.of(value));
     }
 
-    static SavepointKeyFilter range(
+    static <K> SavepointKeyFilter<K> range(
             @Nullable Comparable<?> lower,
             boolean lowerInclusive,
             @Nullable Comparable<?> upper,
             boolean upperInclusive) {
         BoundInfo lowerBoundInfo = lower != null ? new BoundInfo(lower, 
lowerInclusive) : null;
         BoundInfo upperBoundInfo = upper != null ? new BoundInfo(upper, 
upperInclusive) : null;
-        return new RangeKeyFilter(lowerBoundInfo, upperBoundInfo);
+        return new RangeKeyFilter<>(lowerBoundInfo, upperBoundInfo);
     }
 
-    static SavepointKeyFilter empty() {
-        return EmptyKeyFilter.INSTANCE;
+    @SuppressWarnings("unchecked")

Review Comment:
   Fixed.



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/filter/EmptyKeyFilter.java:
##########
@@ -22,16 +22,22 @@
 import java.util.Set;
 
 /** A filter that rejects every key. */
-final class EmptyKeyFilter implements SavepointKeyFilter {
+final class EmptyKeyFilter<K> implements SavepointKeyFilter<K> {
 
     private static final long serialVersionUID = 1L;
 
-    static final EmptyKeyFilter INSTANCE = new EmptyKeyFilter();
+    @SuppressWarnings("rawtypes")
+    static final EmptyKeyFilter INSTANCE = new EmptyKeyFilter<>();

Review Comment:
   Fixed.



##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/filter/SavepointKeyFilter.java:
##########
@@ -79,43 +83,44 @@ default BoundInfo getUpperBound() {
      *
      * <p>Used only while combining filters during push-down translation, not 
during the scan.
      */
-    default SavepointKeyFilter intersect(SavepointKeyFilter other) {
+    default SavepointKeyFilter<K> intersect(SavepointKeyFilter<K> other) {
         throw new UnsupportedOperationException(
                 getClass().getSimpleName() + " does not support intersect()");
     }
 
-    static SavepointKeyFilter filterKeys(Set<Object> keys, SavepointKeyFilter 
predicate) {
-        final Set<Object> retained = new HashSet<>();
-        for (Object key : keys) {
+    static <K> SavepointKeyFilter<K> filterKeys(Set<K> keys, 
SavepointKeyFilter<K> predicate) {
+        final Set<K> retained = new HashSet<>();
+        for (K key : keys) {
             if (predicate.test(key)) {
                 retained.add(key);
             }
         }
         return exact(retained);
     }
 
-    static SavepointKeyFilter exact(Set<Object> keys) {
+    static <K> SavepointKeyFilter<K> exact(Set<K> keys) {
         if (keys.isEmpty()) {
-            return EmptyKeyFilter.INSTANCE;
+            return EmptyKeyFilter.instance();
         }
-        return new ExactKeyFilter(keys);
+        return new ExactKeyFilter<>(keys);
     }
 
-    static SavepointKeyFilter exact(Object value) {
-        return new ExactKeyFilter(Set.of(value));
+    static <K> SavepointKeyFilter<K> exact(K value) {
+        return new ExactKeyFilter<>(Set.of(value));
     }
 
-    static SavepointKeyFilter range(
+    static <K> SavepointKeyFilter<K> range(

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to