prashantbh commented on code in PR #26441:
URL: https://github.com/apache/flink/pull/26441#discussion_r2039503996


##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowStagger.java:
##########
@@ -21,43 +21,81 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
+import java.io.Serializable;
 import java.util.concurrent.ThreadLocalRandom;
 
-/** A {@code WindowStagger} staggers offset in runtime for each window 
assignment. */
+/**
+ * Interface for defining how window assignments should be staggered to 
distribute load over time.
+ * Implementations determine the offset applied during window assignment.
+ */
 @PublicEvolving
-public enum WindowStagger {
-    /** Default mode, all panes fire at the same time across all partitions. */
-    ALIGNED {
-        @Override
-        public long getStaggerOffset(final long currentProcessingTime, final 
long size) {
-            return 0L;
-        }
-    },
+public interface WindowStagger extends Serializable {
+
+    /**
+     * Calculates the stagger offset for a window assignment based on the 
current processing time
+     * and the window size.
+     *
+     * @param currentProcessingTime The current processing time.
+     * @param size The size of the window.
+     * @return The calculated stagger offset in milliseconds.
+     */
+    long getStaggerOffset(final long currentProcessingTime, final long size);
+
+    // Pre-defined Staggering Strategies
+    /** Default mode: No staggering, all panes fire at the same time across 
all partitions. */
+    WindowStagger ALIGNED =
+            new WindowStagger() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public long getStaggerOffset(final long currentProcessingTime, 
final long size) {
+                    return 0L;
+                }
+
+                @Override
+                public String toString() {
+                    return "ALIGNED";
+                }
+            };
 
     /**
      * Stagger offset is sampled from uniform distribution U(0, WindowSize) 
when first event
      * ingested in the partitioned operator.
      */
-    RANDOM {
-        @Override
-        public long getStaggerOffset(final long currentProcessingTime, final 
long size) {
-            return (long) (ThreadLocalRandom.current().nextDouble() * size);
-        }
-    },
+    WindowStagger RANDOM =
+            new WindowStagger() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public long getStaggerOffset(final long currentProcessingTime, 
final long size) {
+                    return (long) (ThreadLocalRandom.current().nextDouble() * 
size);
+                }
+
+                @Override
+                public String toString() {
+                    return "RANDOM";
+                }
+            };
 
     /**
      * When the first event is received in the window operator, take the 
difference between the
      * start of the window and current procesing time as the offset. This way, 
windows are staggered
      * based on when each parallel operator receives the first event.
      */
-    NATURAL {
-        @Override
-        public long getStaggerOffset(final long currentProcessingTime, final 
long size) {
-            final long currentProcessingWindowStart =
-                    TimeWindow.getWindowStartWithOffset(currentProcessingTime, 
0, size);
-            return Math.max(0, currentProcessingTime - 
currentProcessingWindowStart);
-        }
-    };
-
-    public abstract long getStaggerOffset(final long currentProcessingTime, 
final long size);
+    WindowStagger NATURAL =
+            new WindowStagger() {
+                private static final long serialVersionUID = 1L; // Add 
serialVersionUID
+
+                @Override
+                public long getStaggerOffset(final long currentProcessingTime, 
final long size) {
+                    final long currentProcessingWindowStart =
+                            
TimeWindow.getWindowStartWithOffset(currentProcessingTime, 0, size);
+                    return Math.max(0, currentProcessingTime - 
currentProcessingWindowStart);
+                }
+
+                @Override
+                public String toString() {
+                    return "NATURAL"; // Optional: for better readability

Review Comment:
   I will remove this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to