TengHu commented on a change in pull request #12297:
URL: https://github.com/apache/flink/pull/12297#discussion_r432696612



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
##########
@@ -46,28 +46,44 @@
 
        private final long size;
 
-       private final long offset;
+       private final long globalOffset;
 
-       private TumblingProcessingTimeWindows(long size, long offset) {
+       private Long staggerOffset = null;
+
+       private final WindowStagger windowStagger;
+
+       private TumblingProcessingTimeWindows(long size, long offset, 
WindowStagger windowStagger) {
                if (Math.abs(offset) >= size) {
                        throw new 
IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy 
abs(offset) < size");
                }
 
                this.size = size;
-               this.offset = offset;
+               this.globalOffset = offset;
+               this.windowStagger = windowStagger;
        }
 
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long 
timestamp, WindowAssignerContext context) {
                final long now = context.getCurrentProcessingTime();
-               long start = TimeWindow.getWindowStartWithOffset(now, offset, 
size);
+               if (staggerOffset == null) {
+                       staggerOffset = 
windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);

Review comment:
       Yes, your interpretation is correct, the intention was to align the 
panes with respect to the first event evert ingested.
   
   Events arrive at different times among partitions, the differences were 
usually caused by the design of partition keys, network delay, etc, which led 
to an implicit staggering (normal distribution in our case).
   
   This is useful because compared to the ALIGNED and RANDOM, this one staggers 
the pane but still preserves some useful alignments (for example, in our 
geospatial applications, windowing on events partitioned by cities still 
trigger at the same time if they're under same time zone ).
   
   Therefore, we think this would be a useful option for staggering.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to