davidradl commented on code in PR #26441: URL: https://github.com/apache/flink/pull/26441#discussion_r2039502534
########## flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowStagger.java: ########## @@ -21,43 +21,81 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import java.io.Serializable; import java.util.concurrent.ThreadLocalRandom; -/** A {@code WindowStagger} staggers offset in runtime for each window assignment. */ +/** + * Interface for defining how window assignments should be staggered to distribute load over time. + * Implementations determine the offset applied during window assignment. + */ @PublicEvolving -public enum WindowStagger { - /** Default mode, all panes fire at the same time across all partitions. */ - ALIGNED { - @Override - public long getStaggerOffset(final long currentProcessingTime, final long size) { - return 0L; - } - }, +public interface WindowStagger extends Serializable { + + /** + * Calculates the stagger offset for a window assignment based on the current processing time + * and the window size. + * + * @param currentProcessingTime The current processing time. + * @param size The size of the window. + * @return The calculated stagger offset in milliseconds. + */ + long getStaggerOffset(final long currentProcessingTime, final long size); + + // Pre-defined Staggering Strategies + /** Default mode: No staggering, all panes fire at the same time across all partitions. */ + WindowStagger ALIGNED = + new WindowStagger() { + private static final long serialVersionUID = 1L; + + @Override + public long getStaggerOffset(final long currentProcessingTime, final long size) { + return 0L; + } + + @Override + public String toString() { + return "ALIGNED"; + } + }; /** * Stagger offset is sampled from uniform distribution U(0, WindowSize) when first event * ingested in the partitioned operator. */ - RANDOM { - @Override - public long getStaggerOffset(final long currentProcessingTime, final long size) { - return (long) (ThreadLocalRandom.current().nextDouble() * size); - } - }, + WindowStagger RANDOM = + new WindowStagger() { + private static final long serialVersionUID = 1L; + + @Override + public long getStaggerOffset(final long currentProcessingTime, final long size) { + return (long) (ThreadLocalRandom.current().nextDouble() * size); + } + + @Override + public String toString() { + return "RANDOM"; + } + }; /** * When the first event is received in the window operator, take the difference between the * start of the window and current procesing time as the offset. This way, windows are staggered * based on when each parallel operator receives the first event. */ - NATURAL { - @Override - public long getStaggerOffset(final long currentProcessingTime, final long size) { - final long currentProcessingWindowStart = - TimeWindow.getWindowStartWithOffset(currentProcessingTime, 0, size); - return Math.max(0, currentProcessingTime - currentProcessingWindowStart); - } - }; - - public abstract long getStaggerOffset(final long currentProcessingTime, final long size); + WindowStagger NATURAL = + new WindowStagger() { + private static final long serialVersionUID = 1L; // Add serialVersionUID + + @Override + public long getStaggerOffset(final long currentProcessingTime, final long size) { + final long currentProcessingWindowStart = + TimeWindow.getWindowStartWithOffset(currentProcessingTime, 0, size); + return Math.max(0, currentProcessingTime - currentProcessingWindowStart); + } + + @Override + public String toString() { + return "NATURAL"; // Optional: for better readability Review Comment: nit: why does this toString have a comment but this others do not? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org