wuchong commented on a change in pull request #14708: URL: https://github.com/apache/flink/pull/14708#discussion_r563306124
########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java ########## @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.slicing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.util.IterableIterator; +import org.apache.flink.util.MathUtils; + +import org.apache.commons.math3.util.ArithmeticUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Utilities to create {@link SliceAssigner}s. */ +@Internal +public final class SliceAssigners { + + // ------—------—------—------—------—------—------—------—------—------—------—------—------— + // Utilities + // ------—------—------—------—------—------—------—------—------—------—------—------—------— + + /** + * Creates a tumbling window {@link SliceAssigner} that assigns elements to slices of tumbling + * windows. + * + * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on + * processing time. + * @param size the size of the generated windows. + */ + public static TumblingSliceAssigner tumbling(int rowtimeIndex, Duration size) { + return new TumblingSliceAssigner(rowtimeIndex, size.toMillis(), 0); + } + + /** + * Creates a hopping window {@link SliceAssigner} that assigns elements to slices of hopping + * windows. + * + * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on * + * processing time. + * @param size the size of the generated windows. + * @param slide the slide interval of the generated windows. + */ + public static HoppingSliceAssigner hopping(int rowtimeIndex, Duration size, Duration slide) { + return new HoppingSliceAssigner(rowtimeIndex, size.toMillis(), slide.toMillis(), 0); + } + + /** + * Creates a cumulative window {@link SliceAssigner} that assigns elements to slices of + * cumulative windows. + * + * @param rowtimeIndex the index of rowtime field in the input row, {@code -1} if based on * + * processing time. + * @param maxSize the max size of the generated windows. + * @param step the step interval of the generated windows. + */ + public static CumulativeSliceAssigner cumulative( + int rowtimeIndex, Duration maxSize, Duration step) { + return new CumulativeSliceAssigner(rowtimeIndex, maxSize.toMillis(), step.toMillis(), 0); + } + + /** + * Creates a {@link SliceAssigner} that assigns elements which has been attached window start + * and window end timestamp to slices. The assigned slice is equal to the given window. + * + * @param windowEndIndex the index of window end field in the input row, mustn't be a negative + * value. + * @param windowSize the size of the generated window. + */ + public static WindowedSliceAssigner windowed(int windowEndIndex, Duration windowSize) { + return new WindowedSliceAssigner(windowEndIndex, windowSize.toMillis()); + } + + // ------—------—------—------—------—------—------—------—------—------—------—------—------— + // Slice Assigners + // ------—------—------—------—------—------—------—------—------—------—------—------—------— + + /** The {@link SliceAssigner} for tumbling windows. */ + public static final class TumblingSliceAssigner extends AbstractSliceAssigner + implements SliceUnsharedAssigner { + private static final long serialVersionUID = 1L; + + /** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */ + public TumblingSliceAssigner withOffset(Duration offset) { + return new TumblingSliceAssigner(rowtimeIndex, size, offset.toMillis()); + } + + private final long size; + private final long offset; + private final ReusableListIterable reuseList = new ReusableListIterable(); + + private TumblingSliceAssigner(int rowtimeIndex, long size, long offset) { + super(rowtimeIndex); + checkArgument( + size > 0, + String.format( + "Tumbling Window parameters must satisfy size > 0, but got size %dms.", + size)); + checkArgument( + Math.abs(offset) < size, + String.format( + "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.", Review comment: This is on purpose, to make the message more concise, e.g. `got size 10000ms and offset 20000ms`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org