ableegoldman commented on a change in pull request #10926:
URL: https://github.com/apache/kafka/pull/10926#discussion_r658345192



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -83,16 +92,52 @@ private TimeWindows(final long sizeMs, final long 
advanceMs, final long graceMs)
      * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period
+     * @throws IllegalArgumentException if the specified window size is zero 
or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws 
IllegalArgumentException {
+        return ofSizeAndGrace(size, ofMillis(NO_GRACE_PERIOD));
+    }
+
+    /**
+     * Reject out-of-order events that arrive more than {@code 
millisAfterWindowEnd} after the end of its window.

Review comment:
       The javadocs here should at least contain everything in the 
`ofSizeWithNoGrace` javadocs, but maybe with a bit more on the grace period. 
Both of them are creating a new tumbling window definition

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
##########
@@ -71,6 +71,7 @@
 /**
  * Tests related to internal topics in streams
  */
+@SuppressWarnings("deprecation")

Review comment:
       Ok, I see now that there are a lot of tests using the deprecated 
methods. We should absolutely migrate them all to the new APIs (of course we 
will be forced to once the deprecated ones are removed), but it's ok with me, 
and maybe even slightly preferable, to do this in a followup PR. That way we 
can keep this PR focused on the changes themselves and just the tests for the 
new APIs.
   
   Can you file a ticket to migrate all of the remaining tests over to the new 
APIs and remove the warning suppression? Then you can start working on that 
after this PR is merged, or in parallel with the review but on a separate branch

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##########
@@ -92,32 +100,58 @@ private JoinWindows(final long beforeMs,
         if (beforeMs + afterMs < 0) {
             throw new IllegalArgumentException("Window interval (ie, 
beforeMs+afterMs) must not be negative.");
         }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Grace period must not be 
negative.");
+        }
+
         this.afterMs = afterMs;
         this.beforeMs = beforeMs;
         this.graceMs = graceMs;
         this.enableSpuriousResultFix = enableSpuriousResultFix;
     }
 
+    /**
+     * Reject out-of-order events that are delayed more than {@code 
afterWindowEnd}
+     * after the end of its window.
+     * <p>
+     * Delay is defined as (stream_time - record_timestamp).
+     *
+     * @param timeDifference join window interval
+     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
+     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative of can't be represented as {@code long milliseconds}
+     */
     public static JoinWindows ofTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration afterWindowEnd) {
         return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), afterWindowEnd.toMillis(), true);
     }
 
+    /**
+     * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
+     * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
+     * the timestamp of the record from the primary stream.
+     *
+     * @param timeDifference join window interval
+     * @throws IllegalArgumentException if {@code timeDifference} is negative 
or can't be represented as {@code long milliseconds}
+     */
     public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration 
timeDifference) {
-        return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), 0L, true);
+        return new JoinWindows(timeDifference.toMillis(), 
timeDifference.toMillis(), NO_GRACE_PERIOD, true);
     }
 
-     /**
+    /**
      * Specifies that records of the same key are joinable if their timestamps 
are within {@code timeDifference},
      * i.e., the timestamp of a record from the secondary stream is max {@code 
timeDifference} earlier or later than
      * the timestamp of the record from the primary stream.
      *
-     * @param timeDifference join window interval
+     * @param timeDifference
+     * @return

Review comment:
       nit: either remove this line or fill it out, eg `@return a new 
JoinWindows specification with a 24hour grace period`. Probably good to make 
sure this javadoc is consistent across all the builder methods in this class, 
ie either all methods have the `@return` specified, or none of them do. Same 
for any of the other windows classes

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -341,6 +341,7 @@ private void 
testReprocessingFromScratchAfterResetWithIntermediateUserTopic(fina
         }
     }
 
+    @SuppressWarnings("deprecation")

Review comment:
       Same here, and for all the tests really -- any test should be migrated 
to use the non-deprecated version of the API, unless it's explicitly testing 
the behavior of the old, now-deprecated, method

##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -62,6 +62,7 @@
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation")

Review comment:
       We should migrate this to use the new non-deprecated methods instead of 
just suppressing the warning

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
##########
@@ -83,16 +92,52 @@ private TimeWindows(final long sizeMs, final long 
advanceMs, final long graceMs)
      * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
      *
      * @param size The size of the window
-     * @return a new window definition with default maintain duration of 1 day
+     * @return a new window definition with default no grace period
+     * @throws IllegalArgumentException if the specified window size is zero 
or negative or can't be represented as {@code long milliseconds}
+     */
+    public static TimeWindows ofSizeWithNoGrace(final Duration size) throws 
IllegalArgumentException {

Review comment:
       We should call out explicitly that this is setting the grace period to 
0, which means that out of order records arriving after the window end will be 
dropped. Otherwise it's too easy to just use this method without thinking any 
further about the grace period and what it means/whether you want it

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##########
@@ -78,6 +82,43 @@
     private SlidingWindows(final long timeDifferenceMs, final long graceMs) {
         this.timeDifferenceMs = timeDifferenceMs;
         this.graceMs = graceMs;
+
+        if (timeDifferenceMs < 0) {
+            throw new IllegalArgumentException("Window time difference must 
not be negative.");
+        }
+
+        if (graceMs < 0) {
+            throw new IllegalArgumentException("Window grace period must not 
be negative.");
+        }
+    }
+
+    /**
+     * Return a window definition with the window size

Review comment:
       ```suggestion
        * Return a window definition with the window size and no grace period. 
Note that this means out of order records arriving after the window end will be 
dropped.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to