soin08 commented on code in PR #28590:
URL: https://github.com/apache/flink/pull/28590#discussion_r3509004362


##########
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/filter/SavepointKeyFilter.java:
##########
@@ -111,9 +112,18 @@ static <K> SavepointKeyFilter<K> exact(K value) {
 
     static <K extends Comparable<K>> SavepointKeyFilter<K> range(
             @Nullable K lower, boolean lowerInclusive, @Nullable K upper, 
boolean upperInclusive) {
-        BoundInfo lowerBoundInfo = lower != null ? new BoundInfo(lower, 
lowerInclusive) : null;
-        BoundInfo upperBoundInfo = upper != null ? new BoundInfo(upper, 
upperInclusive) : null;
-        return new RangeKeyFilter<>(lowerBoundInfo, upperBoundInfo);
+        return range(lower, lowerInclusive, upper, upperInclusive, 
Comparator.naturalOrder());
+    }
+
+    static <K> SavepointKeyFilter<K> range(
+            @Nullable K lower,
+            boolean lowerInclusive,
+            @Nullable K upper,
+            boolean upperInclusive,
+            Comparator<K> comparator) {

Review Comment:
   I think the comparator must be Serializable — the filter is shipped with the 
job. A plain Comparator/lambda would compile but throw NotSerializableException 
at submission.
   
   A quick search in Flink's codebase shows  that Flink handles this with a 
named type used as the field/param type, e.g.:
   - `EventComparator<T> extends Comparator<T>, Serializable` (flink-cep) — 
public generic interface
   - `RecordComparator extends Comparator<RowData>, Serializable` 
(flink-table-runtime)
   - (as a class) `ComparableComparator<K> implements Comparator<K>, 
Serializable (SortedMapTypeInfo)`
   
   Suggestion: add `public interface SerializableComparator<K> extends 
Comparator<K>, Serializable {}` and change the overload param to 
`SerializableComparator<K> comparator` (callers can still pass a lambda, no 
cast), plus update the doc.
   
   Additionally, we should cover this method with a unit test in 
`SavepointFilterTranslatorTest`, proving that passing a custom comparator 
changes filter behaviour. For example:
   
   ```
   void rangeWithCustomComparatorIsUsed() throws Exception {
           // Orders strings by length — clearly not the natural String order.
           SavepointKeyFilter<String> filter =
                   SavepointKeyFilter.range(
                           "aa", true, "cccc", true, (a, b) -> 
Integer.compare(a.length(), b.length()));
   
           // The custom comparator (length in [2, 4]), not natural order, 
decides membership.
           assertThat(filter.test("abc")).isTrue();
           assertThat(filter.test("a")).isFalse();
           assertThat(filter.test("ccccc")).isFalse();
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to