reswqa commented on code in PR #26001: URL: https://github.com/apache/flink/pull/26001#discussion_r1919639049
########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoInputNonNroadcastWindowStreamProcessFunction.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.window.context.TwoInputWindowContext; + +/** + * A type of {@link WindowProcessFunction} that targets two input windows, such as in a join + * operation. + * + * @param <IN1> The type of the input1 value. + * @param <IN2> The type of the input2 value. + * @param <OUT> The type of the output value. + */ +@Experimental +public interface TwoInputNonNroadcastWindowStreamProcessFunction<IN1, IN2, OUT> + extends WindowProcessFunction { + + /** + * The {@link #onRecord1} method will be invoked when a record is received from input1. Its Review Comment: We should avoid link to self in java doc. Can be replaced by `This method`. Please also check this PR for other similar issue. ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoInputNonNroadcastWindowStreamProcessFunction.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.window.context.TwoInputWindowContext; + +/** + * A type of {@link WindowProcessFunction} that targets two input windows, such as in a join + * operation. + * + * @param <IN1> The type of the input1 value. + * @param <IN2> The type of the input2 value. + * @param <OUT> The type of the output value. + */ +@Experimental +public interface TwoInputNonNroadcastWindowStreamProcessFunction<IN1, IN2, OUT> Review Comment: ```suggestion public interface TwoInputNonBroadcastWindowStreamProcessFunction<IN1, IN2, OUT> ``` :) ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/WindowProcessFunction.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.datastream.api.function.ProcessFunction; + +import java.util.Collections; +import java.util.Set; + +/** + * Base interface for functions evaluated over windows, providing callback functions for various + * stages of the window's lifecycle. + */ +@Experimental +public interface WindowProcessFunction extends ProcessFunction { + + /** + * Explicitly declares states that are bound to the window upfront. Each specific window state Review Comment: ```suggestion * Explicitly declares states that are bound to the window. Each specific window state ``` ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoInputNonNroadcastWindowStreamProcessFunction.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.window.context.TwoInputWindowContext; + +/** + * A type of {@link WindowProcessFunction} that targets two input windows, such as in a join + * operation. + * Review Comment: ```suggestion /** * A type of {@link WindowProcessFunction} for two input window processing, such as window-join. * ``` ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; +import java.time.Duration; + +/** The WindowStrategy defines how to generate Window in the stream. */ +@Experimental +public class WindowStrategy implements Serializable { + + public static final TimeType PROCESSING_TIME = TimeType.PROCESSING; + public static final TimeType EVENT_TIME = TimeType.EVENT; + + /** The types of time used in window operations. */ + @Experimental + public enum TimeType { + PROCESSING, + EVENT + } + + // ============== global window ================ + + /** + * Creates a global window strategy. Note that the global window can be used in both + * GlobalStream, KeyedStream, NonKeyedStream. + * + * @return A global window strategy. + */ + public static WindowStrategy global() { + return new GlobalWindowStrategy(); + } + + // ============== tumbling time window ================ + + /** + * Create a tumbling time window strategy with the event time default time type. Note that + * tumbling time windows can only be used in KeyedStream. + * + * @param windowSize the size of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize) { + return new TumblingTimeWindowStrategy(windowSize); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize, TimeType timeType) { + return new TumblingTimeWindowStrategy(windowSize, timeType); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @param allowedLateness the allowed lateness of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling( + Duration windowSize, TimeType timeType, Duration allowedLateness) { + return new TumblingTimeWindowStrategy(windowSize, timeType, allowedLateness); + } + + // ============== sliding time window ================ + + /** + * Create a sliding time window strategy with the event time default time type. Note that + * sliding time windows can only be used in KeyedStream. + * + * @param windowSize the size of Window. + * @param windowSlideInterval the slide interval of Window. + * @return A sliding time window strategy. + */ + public static WindowStrategy sliding(Duration windowSize, Duration windowSlideInterval) { + return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval); + } + + /** + * Create a sliding time window strategy. Note that sliding time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param windowSlideInterval the slide interval of Window. + * @param timeType the time type of Window. + * @return A sliding time window strategy. + */ + public static WindowStrategy sliding( + Duration windowSize, Duration windowSlideInterval, TimeType timeType) { + return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval, timeType); + } + + /** + * Create a sliding time window strategy. Note that sliding time windows can only be used in + * KeyedStream. Review Comment: How about GlobalStream? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/OneInputWindowContext.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.context; + +import org.apache.flink.annotation.Experimental; + +/** + * The {@link OneInputWindowContext} interface extends {@link WindowContext} and provides additional + * functionality for writing and reading window data of one input window. + * + * @param <IN> Type of the input elements + */ +@Experimental +public interface OneInputWindowContext<IN> extends WindowContext { + + /** Write records into the window's state. */ + void putRecord(IN record); + + /** + * Read records from the window's state, note that this cloud be null if the window is empty. Review Comment: which cloud? Alibaba Cloud? ☁️ ☁️ ☁️ 😉 please also check `TwoInputWindowContext`. ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SlidingTimeWindowStrategy.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.time.Duration; + +/** A {@link WindowStrategy} used to generate sliding TimeWindow. */ +@Experimental +public class SlidingTimeWindowStrategy extends WindowStrategy { + private Duration windowSize; + private Duration windowSlideInterval; + private TimeType timeType; + private Duration allowedLateness; Review Comment: Can be final. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/context/WindowStateStore.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.window.context; + +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.datastream.api.extension.window.context.WindowContext; +import org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator; +import org.apache.flink.runtime.state.v2.adaptor.ListStateAdaptor; +import org.apache.flink.runtime.state.v2.adaptor.MapStateAdaptor; +import org.apache.flink.runtime.state.v2.adaptor.ValueStateAdaptor; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This class provides methods to store and retrieve state associated with windows in {@link + * WindowContext}. + * + * @param <K> Type of the window key. + * @param <W> Type of the window. + */ +public class WindowStateStore<K, W extends Window> { + + private static final Logger LOG = LoggerFactory.getLogger(WindowStateStore.class); + + /** User-defined {@link WindowProcessFunction}. */ + private final WindowProcessFunction windowProcessFunction; + + /** + * The {@link StateDeclaration}s that have been declared by the {@link + * WindowProcessFunction#useWindowStates()}. + */ + private final Set<StateDeclaration> declaredWindowStateDeclaration; Review Comment: ```suggestion private final Set<StateDeclaration> windowStateDeclarations; ``` ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/function/InternalOneInputWindowStreamProcessFunction.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.window.function; + +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction; +import org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction; +import org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import java.util.Set; + +/** + * A class that wrap a {@link OneInputWindowStreamProcessFunction} to internal processing. + * + * @param <IN> Type of the input elements. + * @param <OUT> Type of the output elements. + * @param <W> Type of the window. + */ +public class InternalOneInputWindowStreamProcessFunction<IN, OUT, W extends Window> + implements OneInputStreamProcessFunction<IN, OUT> { + + /** User-defined {@link WindowProcessFunction}. */ + private final OneInputWindowStreamProcessFunction<IN, OUT> windowProcessFunction; + + private final WindowAssigner<IN, W> assigner; + + private final Trigger<IN, W> trigger; + + /** + * The allowed lateness for elements. This is used for: + * + * <ul> + * <li>Deciding if an element should be dropped from a window due to lateness. + * <li>Clearing the state of a window if the system time passes the {@code window.maxTimestamp Review Comment: ```suggestion * <li>Clearing the state of a window if the time out-of the {@code window.maxTimestamp ``` ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/context/WindowTriggerContext.java: ########## @@ -0,0 +1,174 @@ +package org.apache.flink.datastream.impl.extension.window.context; + +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Collection; + +/** + * {@code WindowTriggerContext} is a utility for handling {@code Trigger} invocations. It can be + * reused by setting the {@code key} and {@code window} fields. No internal state must be kept in + * the {@code WindowTriggerContext} Review Comment: What's the meaning of `No internal state must be kept in the {@code WindowTriggerContext}`? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.context; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.ValueState; + +import java.util.Optional; + +/** + * The {@link WindowContext} interface represents a context for window operations and provides + * methods to interact with state that is scoped to the window. + */ +@Experimental +public interface WindowContext { + + /** + * Gets the starting timestamp of the window. This is the first timestamp that belongs to this + * window. + * + * @return The starting timestamp of this window, or -1 if the window is not a time window or a + * session window. + */ + long getStartTime(); + + /** + * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it is the + * first timestamp that does not belong to this window anymore. + * + * @return The exclusive end timestamp of this window, or -1 if the window is not a time window + * or a session window. Review Comment: ```suggestion * @return The exclusive end timestamp of this window, or -1 if the window is * a session window or not a time window. ``` ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.context; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.ValueState; + +import java.util.Optional; + +/** + * The {@link WindowContext} interface represents a context for window operations and provides + * methods to interact with state that is scoped to the window. + */ +@Experimental +public interface WindowContext { + + /** + * Gets the starting timestamp of the window. This is the first timestamp that belongs to this + * window. + * + * @return The starting timestamp of this window, or -1 if the window is not a time window or a + * session window. Review Comment: ```suggestion * @return The starting timestamp of this window, or -1 if the window is a * session window or not a time window. ``` ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoOutputWindowStreamProcessFunction.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext; +import org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext; + +/** + * A type of {@link WindowProcessFunction} that targets two output windows. Review Comment: ```suggestion * A type of {@link WindowProcessFunction} for two-output window processing. ``` ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java: ########## @@ -121,4 +127,89 @@ public static <KEY, T1, T2, OUT> NonKeyedPartitionStream<OUT> join( joinFunction, joinType); } + + // =================== Window =========================== + + static final Class<?> WINDOW_FUNCS_INSTANCE; + + static { + try { + WINDOW_FUNCS_INSTANCE = + Class.forName("org.apache.flink.datastream.impl.builtin.BuiltinWindowFuncs"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Please ensure that flink-datastream in your class path"); + } + } + + /** + * Wrap the WindowStrategy and OneInputWindowStreamProcessFunction within a + * OneInputStreamProcessFunction to perform the window operation. + * + * @param windowStrategy the window strategy + * @param windowProcessFunction the window process function + * @return the wrapped process function + */ + public static <IN, OUT> OneInputStreamProcessFunction<IN, OUT> window( + WindowStrategy windowStrategy, + OneInputWindowStreamProcessFunction<IN, OUT> windowProcessFunction) { + try { + return (OneInputStreamProcessFunction<IN, OUT>) + WINDOW_FUNCS_INSTANCE + .getMethod( + "window", + WindowStrategy.class, + OneInputWindowStreamProcessFunction.class) + .invoke(null, windowStrategy, windowProcessFunction); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Wrap the WindowStrategy and TwoInputNonNroadcastWindowStreamProcessFunction within a Review Comment: ```suggestion * Wrap the WindowStrategy and TwoInputNonBroadcastWindowStreamProcessFunction within a ``` ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/OneInputWindowStreamProcessFunction.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext; + +/** + * A type of {@link WindowProcessFunction} that targets one input windows. + * + * @param <IN> The type of the input value. + * @param <OUT> The type of the output value. + */ +@Experimental +public interface OneInputWindowStreamProcessFunction<IN, OUT> extends WindowProcessFunction { + + /** + * The {@link #onRecord} method will be invoked when a record is received. Its default behavior + * is to store data in built-in window state by {@code WindowContext#putRecord}. If the user + * overrides this method, they will need to update the window state as necessary. Review Comment: ```suggestion * The {@link #onRecord} method will be invoked when a record is received. Its default behavior * is to store data in built-in window state by {@link OneInputWindowContext#putRecord}. If the user * overrides this method, they have to take care of the input data themselves. ``` Same for other `XXXWindowStreamProcessFunction`. ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/OneInputWindowStreamProcessFunction.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.function; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext; + +/** + * A type of {@link WindowProcessFunction} that targets one input windows. Review Comment: ```suggestion * A type of {@link WindowProcessFunction} for one-input window processing. ``` ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java: ########## @@ -70,13 +74,26 @@ public <OUT> ProcessConfigurableAndGlobalStream<OUT> process( TypeInformation<OUT> outType = StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType()); - ProcessOperator<T, OUT> operator = new ProcessOperator<>(processFunction); - return StreamUtils.wrapWithConfigureHandle( - transform( - "Global Process", - outType, - operator, - AttributeParser.parseAttribute(processFunction))); + + if (processFunction instanceof InternalOneInputWindowStreamProcessFunction) { + // Transform to keyed stream. + KeyedPartitionStreamImpl<Byte, T> keyedStream = Review Comment: Why we construct keyed stream first and then convert it to global stream? It feels sort of wired, if you just want to use the transformWindow method on keyed stream, why not extract it into the StreamUtils? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; +import java.time.Duration; + +/** The WindowStrategy defines how to generate Window in the stream. */ +@Experimental +public class WindowStrategy implements Serializable { + + public static final TimeType PROCESSING_TIME = TimeType.PROCESSING; + public static final TimeType EVENT_TIME = TimeType.EVENT; + + /** The types of time used in window operations. */ + @Experimental + public enum TimeType { + PROCESSING, + EVENT + } + + // ============== global window ================ + + /** + * Creates a global window strategy. Note that the global window can be used in both + * GlobalStream, KeyedStream, NonKeyedStream. + * + * @return A global window strategy. + */ + public static WindowStrategy global() { + return new GlobalWindowStrategy(); + } + + // ============== tumbling time window ================ + + /** + * Create a tumbling time window strategy with the event time default time type. Note that + * tumbling time windows can only be used in KeyedStream. Review Comment: How about GlobalStream? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.context; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.ValueState; + +import java.util.Optional; + +/** + * The {@link WindowContext} interface represents a context for window operations and provides + * methods to interact with state that is scoped to the window. + */ +@Experimental +public interface WindowContext { + + /** + * Gets the starting timestamp of the window. This is the first timestamp that belongs to this + * window. + * + * @return The starting timestamp of this window, or -1 if the window is not a time window or a + * session window. + */ + long getStartTime(); + + /** + * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it is the + * first timestamp that does not belong to this window anymore. + * + * @return The exclusive end timestamp of this window, or -1 if the window is not a time window + * or a session window. + */ + long getEndTime(); + + /** + * Retrieves a {@link ListState} object that can be used to interact with fault-tolerant state + * that is scoped to the window and key of the current trigger invocation. Review Comment: ```suggestion * Retrieves a {@link ListState} object that can be used to interact with fault-tolerant state * that is scoped to the window. ``` ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java: ########## @@ -247,6 +299,50 @@ public <OUT1, OUT2> ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2> environment, firstTransformation, firstStream, secondStream); } + public <OUT1, OUT2> Transformation<OUT1> transformTwoOutputWindow( + Tuple2<TypeInformation<OUT1>, TypeInformation<OUT2>> twoOutputType, + OutputTag<OUT2> secondOutputTag, + TwoOutputStreamProcessFunction<V, OUT1, OUT2> processFunction) { + Transformation<OUT1> transform; + TypeInformation<OUT1> firstOutputType = twoOutputType.f0; + TypeInformation<OUT2> secondOutputType = twoOutputType.f1; + if (processFunction instanceof InternalTwoOutputWindowStreamProcessFunction) { + InternalTwoOutputWindowStreamProcessFunction<V, OUT1, OUT2, ?> internalWindowFunction = + (InternalTwoOutputWindowStreamProcessFunction<V, OUT1, OUT2, ?>) + processFunction; + WindowAssigner<V, ?> assigner = internalWindowFunction.getAssigner(); + ListStateDescriptor<V> stateDesc = + new ListStateDescriptor<>( + "window-iterator-state", + TypeExtractor.createTypeInfo(getType().getTypeClass())); + + TwoOutputWindowProcessOperator windowProcessOperator = + new TwoOutputWindowProcessOperator( + internalWindowFunction, + secondOutputTag, + null, + null, + assigner, + internalWindowFunction.getTrigger(), + assigner.getWindowSerializer(environment.getExecutionConfig()), + stateDesc, + internalWindowFunction.getAllowedLateness()); + transform = + StreamUtils.getOneInputKeyedTransformation( + "Two-Output-Process", + this, + firstOutputType, + windowProcessOperator, + keySelector, + keyType); + // transform = oneInputTransformWithOperator("Window", firstOutputType, + // windowProcessOperator); Review Comment: ? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.context; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.ValueState; + +import java.util.Optional; + +/** + * The {@link WindowContext} interface represents a context for window operations and provides + * methods to interact with state that is scoped to the window. + */ +@Experimental +public interface WindowContext { + + /** + * Gets the starting timestamp of the window. This is the first timestamp that belongs to this + * window. + * + * @return The starting timestamp of this window, or -1 if the window is not a time window or a + * session window. + */ + long getStartTime(); + + /** + * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it is the + * first timestamp that does not belong to this window anymore. + * + * @return The exclusive end timestamp of this window, or -1 if the window is not a time window + * or a session window. + */ + long getEndTime(); + + /** + * Retrieves a {@link ListState} object that can be used to interact with fault-tolerant state + * that is scoped to the window and key of the current trigger invocation. + */ + <T> Optional<ListState<T>> getWindowState(ListStateDeclaration<T> stateDeclaration) + throws Exception; + + /** + * Retrieves a {@link MapState} object that can be used to interact with fault-tolerant state + * that is scoped to the window and key of the current trigger invocation. Review Comment: ```suggestion * Retrieves a {@link MapState} object that can be used to interact with fault-tolerant state * that is scoped to the window. ``` ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java: ########## @@ -424,6 +563,31 @@ public <T_OTHER, OUT> Transformation<OUT> getJoinTransformation( joinProcessOperator); } + private <R> Transformation<R> oneInputTransformWithOperator( Review Comment: Can this be replaced by `StreamUtils.getOneInputKeyedTransformation`? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; +import java.time.Duration; + +/** The WindowStrategy defines how to generate Window in the stream. */ +@Experimental +public class WindowStrategy implements Serializable { + + public static final TimeType PROCESSING_TIME = TimeType.PROCESSING; + public static final TimeType EVENT_TIME = TimeType.EVENT; + + /** The types of time used in window operations. */ + @Experimental + public enum TimeType { + PROCESSING, + EVENT + } + + // ============== global window ================ + + /** + * Creates a global window strategy. Note that the global window can be used in both + * GlobalStream, KeyedStream, NonKeyedStream. + * + * @return A global window strategy. + */ + public static WindowStrategy global() { + return new GlobalWindowStrategy(); + } + + // ============== tumbling time window ================ + + /** + * Create a tumbling time window strategy with the event time default time type. Note that + * tumbling time windows can only be used in KeyedStream. + * + * @param windowSize the size of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize) { + return new TumblingTimeWindowStrategy(windowSize); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize, TimeType timeType) { + return new TumblingTimeWindowStrategy(windowSize, timeType); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @param allowedLateness the allowed lateness of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling( + Duration windowSize, TimeType timeType, Duration allowedLateness) { + return new TumblingTimeWindowStrategy(windowSize, timeType, allowedLateness); + } + + // ============== sliding time window ================ + + /** + * Create a sliding time window strategy with the event time default time type. Note that + * sliding time windows can only be used in KeyedStream. + * + * @param windowSize the size of Window. + * @param windowSlideInterval the slide interval of Window. + * @return A sliding time window strategy. + */ + public static WindowStrategy sliding(Duration windowSize, Duration windowSlideInterval) { + return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval); + } + + /** + * Create a sliding time window strategy. Note that sliding time windows can only be used in + * KeyedStream. Review Comment: How about GlobalStream? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.context; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.ValueState; + +import java.util.Optional; + +/** + * The {@link WindowContext} interface represents a context for window operations and provides + * methods to interact with state that is scoped to the window. + */ +@Experimental +public interface WindowContext { + + /** + * Gets the starting timestamp of the window. This is the first timestamp that belongs to this + * window. + * + * @return The starting timestamp of this window, or -1 if the window is not a time window or a + * session window. + */ + long getStartTime(); + + /** + * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it is the + * first timestamp that does not belong to this window anymore. + * + * @return The exclusive end timestamp of this window, or -1 if the window is not a time window + * or a session window. + */ + long getEndTime(); + + /** + * Retrieves a {@link ListState} object that can be used to interact with fault-tolerant state + * that is scoped to the window and key of the current trigger invocation. + */ + <T> Optional<ListState<T>> getWindowState(ListStateDeclaration<T> stateDeclaration) + throws Exception; + + /** + * Retrieves a {@link MapState} object that can be used to interact with fault-tolerant state + * that is scoped to the window and key of the current trigger invocation. + */ + <KEY, V> Optional<MapState<KEY, V>> getWindowState(MapStateDeclaration<KEY, V> stateDeclaration) + throws Exception; + + /** + * Retrieves a {@link ValueState} object that can be used to interact with fault-tolerant state + * that is scoped to the window and key of the current trigger invocation. Review Comment: ```suggestion * Retrieves a {@link ValueState} object that can be used to interact with fault-tolerant state * that is scoped to the window. ``` ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/context/WindowStateStore.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.window.context; + +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.datastream.api.extension.window.context.WindowContext; +import org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator; +import org.apache.flink.runtime.state.v2.adaptor.ListStateAdaptor; +import org.apache.flink.runtime.state.v2.adaptor.MapStateAdaptor; +import org.apache.flink.runtime.state.v2.adaptor.ValueStateAdaptor; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This class provides methods to store and retrieve state associated with windows in {@link + * WindowContext}. + * + * @param <K> Type of the window key. + * @param <W> Type of the window. + */ +public class WindowStateStore<K, W extends Window> { + + private static final Logger LOG = LoggerFactory.getLogger(WindowStateStore.class); + + /** User-defined {@link WindowProcessFunction}. */ + private final WindowProcessFunction windowProcessFunction; + + /** + * The {@link StateDeclaration}s that have been declared by the {@link + * WindowProcessFunction#useWindowStates()}. + */ + private final Set<StateDeclaration> declaredWindowStateDeclaration; + + /** The operator to which the window belongs, used for creating and retrieving window state. */ + private final AbstractAsyncStateStreamOperator<?> operator; + + private final TypeSerializer<W> windowSerializer; + + /** Whether the window is a merging window. */ + private final boolean isMergingWindow; + + public WindowStateStore( + WindowProcessFunction windowProcessFunction, + AbstractAsyncStateStreamOperator<?> operator, + TypeSerializer<W> windowSerializer, + boolean isMergingWindow) { + this.windowProcessFunction = windowProcessFunction; + this.declaredWindowStateDeclaration = windowProcessFunction.useWindowStates(); + this.operator = operator; + this.windowSerializer = windowSerializer; + this.isMergingWindow = isMergingWindow; + } + + private boolean isStateDeclared(StateDeclaration stateDeclaration) { + if (!declaredWindowStateDeclaration.contains(stateDeclaration)) { + LOG.warn( + "Fail to get window state for " + + stateDeclaration.getName() + + ", please declare the used state in the `WindowProcessFunction#useWindowStates` method first."); + return false; + } + return true; + } + + private boolean stateRedistributionModeIsNone(StateDeclaration stateDeclaration) { + StateDeclaration.RedistributionMode redistributionMode = + stateDeclaration.getRedistributionMode(); + return redistributionMode == StateDeclaration.RedistributionMode.NONE; + } Review Comment: I think we should invert this method's semantic because we always check its inverse. ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; +import java.time.Duration; + +/** The WindowStrategy defines how to generate Window in the stream. */ +@Experimental +public class WindowStrategy implements Serializable { + + public static final TimeType PROCESSING_TIME = TimeType.PROCESSING; + public static final TimeType EVENT_TIME = TimeType.EVENT; + + /** The types of time used in window operations. */ + @Experimental + public enum TimeType { + PROCESSING, + EVENT + } + + // ============== global window ================ + + /** + * Creates a global window strategy. Note that the global window can be used in both + * GlobalStream, KeyedStream, NonKeyedStream. + * + * @return A global window strategy. + */ + public static WindowStrategy global() { + return new GlobalWindowStrategy(); + } + + // ============== tumbling time window ================ + + /** + * Create a tumbling time window strategy with the event time default time type. Note that + * tumbling time windows can only be used in KeyedStream. + * + * @param windowSize the size of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize) { + return new TumblingTimeWindowStrategy(windowSize); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. Review Comment: How about GlobalStream? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.context; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.ValueState; + +import java.util.Optional; + +/** + * The {@link WindowContext} interface represents a context for window operations and provides + * methods to interact with state that is scoped to the window. + */ +@Experimental +public interface WindowContext { + + /** + * Gets the starting timestamp of the window. This is the first timestamp that belongs to this + * window. + * + * @return The starting timestamp of this window, or -1 if the window is not a time window or a + * session window. + */ + long getStartTime(); + + /** + * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it is the + * first timestamp that does not belong to this window anymore. + * + * @return The exclusive end timestamp of this window, or -1 if the window is not a time window + * or a session window. + */ + long getEndTime(); + + /** + * Retrieves a {@link ListState} object that can be used to interact with fault-tolerant state + * that is scoped to the window and key of the current trigger invocation. + */ + <T> Optional<ListState<T>> getWindowState(ListStateDeclaration<T> stateDeclaration) + throws Exception; + + /** + * Retrieves a {@link MapState} object that can be used to interact with fault-tolerant state + * that is scoped to the window and key of the current trigger invocation. + */ + <KEY, V> Optional<MapState<KEY, V>> getWindowState(MapStateDeclaration<KEY, V> stateDeclaration) + throws Exception; + + /** + * Retrieves a {@link ValueState} object that can be used to interact with fault-tolerant state + * that is scoped to the window and key of the current trigger invocation. + */ + <T> Optional<ValueState<T>> getWindowState(ValueStateDeclaration<T> stateDeclaration) + throws Exception; +} Review Comment: It seems that `ReducingStateDeclaration` and `AggregatingStateDeclaration` was missed. ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; +import java.time.Duration; + +/** The WindowStrategy defines how to generate Window in the stream. */ Review Comment: The doc here is not precise enough. IMO, WindowStrategy describes what kind of Windows to use, including strategies for dividing, triggering, and clearing Windows. ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; +import java.time.Duration; + +/** The WindowStrategy defines how to generate Window in the stream. */ +@Experimental +public class WindowStrategy implements Serializable { + + public static final TimeType PROCESSING_TIME = TimeType.PROCESSING; + public static final TimeType EVENT_TIME = TimeType.EVENT; + + /** The types of time used in window operations. */ + @Experimental + public enum TimeType { + PROCESSING, + EVENT + } + + // ============== global window ================ + + /** + * Creates a global window strategy. Note that the global window can be used in both + * GlobalStream, KeyedStream, NonKeyedStream. + * + * @return A global window strategy. + */ + public static WindowStrategy global() { + return new GlobalWindowStrategy(); + } + + // ============== tumbling time window ================ + + /** + * Create a tumbling time window strategy with the event time default time type. Note that + * tumbling time windows can only be used in KeyedStream. + * + * @param windowSize the size of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize) { + return new TumblingTimeWindowStrategy(windowSize); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize, TimeType timeType) { + return new TumblingTimeWindowStrategy(windowSize, timeType); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. Review Comment: How about GlobalStream? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; +import java.time.Duration; + +/** The WindowStrategy defines how to generate Window in the stream. */ +@Experimental +public class WindowStrategy implements Serializable { + + public static final TimeType PROCESSING_TIME = TimeType.PROCESSING; + public static final TimeType EVENT_TIME = TimeType.EVENT; + + /** The types of time used in window operations. */ + @Experimental + public enum TimeType { + PROCESSING, + EVENT + } + + // ============== global window ================ + + /** + * Creates a global window strategy. Note that the global window can be used in both + * GlobalStream, KeyedStream, NonKeyedStream. + * + * @return A global window strategy. + */ + public static WindowStrategy global() { + return new GlobalWindowStrategy(); + } + + // ============== tumbling time window ================ + + /** + * Create a tumbling time window strategy with the event time default time type. Note that + * tumbling time windows can only be used in KeyedStream. + * + * @param windowSize the size of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize) { + return new TumblingTimeWindowStrategy(windowSize); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize, TimeType timeType) { + return new TumblingTimeWindowStrategy(windowSize, timeType); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @param allowedLateness the allowed lateness of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling( + Duration windowSize, TimeType timeType, Duration allowedLateness) { + return new TumblingTimeWindowStrategy(windowSize, timeType, allowedLateness); + } + + // ============== sliding time window ================ + + /** + * Create a sliding time window strategy with the event time default time type. Note that + * sliding time windows can only be used in KeyedStream. + * + * @param windowSize the size of Window. + * @param windowSlideInterval the slide interval of Window. + * @return A sliding time window strategy. + */ + public static WindowStrategy sliding(Duration windowSize, Duration windowSlideInterval) { + return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval); + } + + /** + * Create a sliding time window strategy. Note that sliding time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param windowSlideInterval the slide interval of Window. + * @param timeType the time type of Window. + * @return A sliding time window strategy. + */ + public static WindowStrategy sliding( + Duration windowSize, Duration windowSlideInterval, TimeType timeType) { + return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval, timeType); + } + + /** + * Create a sliding time window strategy. Note that sliding time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param windowSlideInterval the slide interval of Window. + * @param timeType the time type of Window. + * @param allowedLateness the allowed lateness of Window. + * @return A sliding time window strategy. + */ + public static WindowStrategy sliding( + Duration windowSize, + Duration windowSlideInterval, + TimeType timeType, + Duration allowedLateness) { + return new SlidingTimeWindowStrategy( + windowSize, windowSlideInterval, timeType, allowedLateness); + } + + // ============== session window ================ + + /** + * Create a session time window strategy with the event time default time type. Note that + * session time windows can only be used in KeyedStream. Review Comment: How about GlobalStream? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/TumblingTimeWindowStrategy.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.time.Duration; + +/** A {@link WindowStrategy} used to generate tumbling TimeWindow. */ +@Experimental +public class TumblingTimeWindowStrategy extends WindowStrategy { + private Duration windowSize; + private TimeType timeType; + private Duration allowedLateness; Review Comment: Can be final. ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; +import java.time.Duration; + +/** The WindowStrategy defines how to generate Window in the stream. */ +@Experimental +public class WindowStrategy implements Serializable { + + public static final TimeType PROCESSING_TIME = TimeType.PROCESSING; + public static final TimeType EVENT_TIME = TimeType.EVENT; + + /** The types of time used in window operations. */ + @Experimental + public enum TimeType { + PROCESSING, + EVENT + } + + // ============== global window ================ + + /** + * Creates a global window strategy. Note that the global window can be used in both + * GlobalStream, KeyedStream, NonKeyedStream. Review Comment: We should not assume that the user is clear about the window assigning, triggering & cleaning timing for each type of window. For each type of window, could you elaborate more details in java doc? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; +import java.time.Duration; + +/** The WindowStrategy defines how to generate Window in the stream. */ +@Experimental +public class WindowStrategy implements Serializable { + + public static final TimeType PROCESSING_TIME = TimeType.PROCESSING; + public static final TimeType EVENT_TIME = TimeType.EVENT; + + /** The types of time used in window operations. */ + @Experimental + public enum TimeType { + PROCESSING, + EVENT + } + + // ============== global window ================ + + /** + * Creates a global window strategy. Note that the global window can be used in both + * GlobalStream, KeyedStream, NonKeyedStream. + * + * @return A global window strategy. + */ + public static WindowStrategy global() { + return new GlobalWindowStrategy(); + } + + // ============== tumbling time window ================ + + /** + * Create a tumbling time window strategy with the event time default time type. Note that + * tumbling time windows can only be used in KeyedStream. + * + * @param windowSize the size of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize) { + return new TumblingTimeWindowStrategy(windowSize); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize, TimeType timeType) { + return new TumblingTimeWindowStrategy(windowSize, timeType); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @param allowedLateness the allowed lateness of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling( + Duration windowSize, TimeType timeType, Duration allowedLateness) { + return new TumblingTimeWindowStrategy(windowSize, timeType, allowedLateness); + } + + // ============== sliding time window ================ + + /** + * Create a sliding time window strategy with the event time default time type. Note that + * sliding time windows can only be used in KeyedStream. Review Comment: How about GlobalStream? ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; +import java.time.Duration; + +/** The WindowStrategy defines how to generate Window in the stream. */ +@Experimental +public class WindowStrategy implements Serializable { + + public static final TimeType PROCESSING_TIME = TimeType.PROCESSING; + public static final TimeType EVENT_TIME = TimeType.EVENT; + + /** The types of time used in window operations. */ + @Experimental + public enum TimeType { + PROCESSING, + EVENT + } + + // ============== global window ================ + + /** + * Creates a global window strategy. Note that the global window can be used in both + * GlobalStream, KeyedStream, NonKeyedStream. + * + * @return A global window strategy. + */ + public static WindowStrategy global() { + return new GlobalWindowStrategy(); + } + + // ============== tumbling time window ================ + + /** + * Create a tumbling time window strategy with the event time default time type. Note that + * tumbling time windows can only be used in KeyedStream. + * + * @param windowSize the size of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize) { + return new TumblingTimeWindowStrategy(windowSize); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling(Duration windowSize, TimeType timeType) { + return new TumblingTimeWindowStrategy(windowSize, timeType); + } + + /** + * Create a tumbling time window strategy. Note that tumbling time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param timeType the time type of Window. + * @param allowedLateness the allowed lateness of Window. + * @return A tumbling time window strategy. + */ + public static WindowStrategy tumbling( + Duration windowSize, TimeType timeType, Duration allowedLateness) { + return new TumblingTimeWindowStrategy(windowSize, timeType, allowedLateness); + } + + // ============== sliding time window ================ + + /** + * Create a sliding time window strategy with the event time default time type. Note that + * sliding time windows can only be used in KeyedStream. + * + * @param windowSize the size of Window. + * @param windowSlideInterval the slide interval of Window. + * @return A sliding time window strategy. + */ + public static WindowStrategy sliding(Duration windowSize, Duration windowSlideInterval) { + return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval); + } + + /** + * Create a sliding time window strategy. Note that sliding time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param windowSlideInterval the slide interval of Window. + * @param timeType the time type of Window. + * @return A sliding time window strategy. + */ + public static WindowStrategy sliding( + Duration windowSize, Duration windowSlideInterval, TimeType timeType) { + return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval, timeType); + } + + /** + * Create a sliding time window strategy. Note that sliding time windows can only be used in + * KeyedStream. + * + * @param windowSize the size of Window. + * @param windowSlideInterval the slide interval of Window. + * @param timeType the time type of Window. + * @param allowedLateness the allowed lateness of Window. + * @return A sliding time window strategy. + */ + public static WindowStrategy sliding( + Duration windowSize, + Duration windowSlideInterval, + TimeType timeType, + Duration allowedLateness) { + return new SlidingTimeWindowStrategy( + windowSize, windowSlideInterval, timeType, allowedLateness); + } + + // ============== session window ================ + + /** + * Create a session time window strategy with the event time default time type. Note that + * session time windows can only be used in KeyedStream. + * + * @param sessionGap the timeout of session. + * @return A session window strategy. + */ + public static WindowStrategy session(Duration sessionGap) { + return new SessionWindowStrategy(sessionGap); + } + + /** + * Create a session time window strategy. Note that session time windows can only be used in + * KeyedStream. Review Comment: How about GlobalStream? ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/utils/WindowUtils.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.window.utils; + +import org.apache.flink.datastream.api.extension.window.strategy.GlobalWindowStrategy; +import org.apache.flink.datastream.api.extension.window.strategy.SessionWindowStrategy; +import org.apache.flink.datastream.api.extension.window.strategy.SlidingTimeWindowStrategy; +import org.apache.flink.datastream.api.extension.window.strategy.TumblingTimeWindowStrategy; +import org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy; +import org.apache.flink.datastream.impl.extension.window.context.WindowTriggerContext; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** Some utils used in Window extension. */ Review Comment: ```suggestion /** Utilities for Window extension. */ ``` ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SessionWindowStrategy.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.time.Duration; + +/** A {@link WindowStrategy} used to generate Session Windows. */ +@Experimental +public class SessionWindowStrategy extends WindowStrategy { + private Duration sessionGap; + private TimeType timeType; Review Comment: Can be final. ########## flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/TumblingTimeWindowStrategy.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.extension.window.strategy; + +import org.apache.flink.annotation.Experimental; + +import java.time.Duration; + +/** A {@link WindowStrategy} used to generate tumbling TimeWindow. */ Review Comment: This is public api, we should provide more details about the tumbling window mechanism. Same for other strategy. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/context/WindowStateStore.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.window.context; + +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.datastream.api.extension.window.context.WindowContext; +import org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator; +import org.apache.flink.runtime.state.v2.adaptor.ListStateAdaptor; +import org.apache.flink.runtime.state.v2.adaptor.MapStateAdaptor; +import org.apache.flink.runtime.state.v2.adaptor.ValueStateAdaptor; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This class provides methods to store and retrieve state associated with windows in {@link + * WindowContext}. + * + * @param <K> Type of the window key. + * @param <W> Type of the window. + */ +public class WindowStateStore<K, W extends Window> { + + private static final Logger LOG = LoggerFactory.getLogger(WindowStateStore.class); + + /** User-defined {@link WindowProcessFunction}. */ + private final WindowProcessFunction windowProcessFunction; + + /** + * The {@link StateDeclaration}s that have been declared by the {@link + * WindowProcessFunction#useWindowStates()}. + */ + private final Set<StateDeclaration> declaredWindowStateDeclaration; + + /** The operator to which the window belongs, used for creating and retrieving window state. */ + private final AbstractAsyncStateStreamOperator<?> operator; + + private final TypeSerializer<W> windowSerializer; + + /** Whether the window is a merging window. */ + private final boolean isMergingWindow; + + public WindowStateStore( + WindowProcessFunction windowProcessFunction, + AbstractAsyncStateStreamOperator<?> operator, + TypeSerializer<W> windowSerializer, + boolean isMergingWindow) { + this.windowProcessFunction = windowProcessFunction; + this.declaredWindowStateDeclaration = windowProcessFunction.useWindowStates(); + this.operator = operator; + this.windowSerializer = windowSerializer; + this.isMergingWindow = isMergingWindow; + } + + private boolean isStateDeclared(StateDeclaration stateDeclaration) { + if (!declaredWindowStateDeclaration.contains(stateDeclaration)) { + LOG.warn( + "Fail to get window state for " + + stateDeclaration.getName() + + ", please declare the used state in the `WindowProcessFunction#useWindowStates` method first."); + return false; + } + return true; + } + + private boolean stateRedistributionModeIsNone(StateDeclaration stateDeclaration) { + StateDeclaration.RedistributionMode redistributionMode = + stateDeclaration.getRedistributionMode(); + return redistributionMode == StateDeclaration.RedistributionMode.NONE; + } + + /** Retrieve window state of list type. */ + public <T> Optional<ListState<T>> getWindowState( + ListStateDeclaration<T> stateDeclaration, W namespace) { + checkState( + !isMergingWindow, + "Retrieving the window state is not permitted when using merging windows, such as session windows."); + + if (!isStateDeclared(stateDeclaration)) { + return Optional.empty(); + } + + if (!stateRedistributionModeIsNone(stateDeclaration)) { + throw new UnsupportedOperationException( + "RedistributionMode " + + stateDeclaration.getRedistributionMode().name() + + " is not supported for window state."); + } + + ListStateDescriptor<T> stateDescriptor = + new ListStateDescriptor<T>( + stateDeclaration.getName(), + TypeExtractor.createTypeInfo( + stateDeclaration.getTypeDescriptor().getTypeClass())); + + try { + ListStateAdaptor<K, W, T> state = + operator.getOrCreateKeyedState(namespace, windowSerializer, stateDescriptor); + state.setCurrentNamespace(namespace); + return Optional.of(state); + } catch (Exception e) { + return Optional.empty(); Review Comment: Exception should be thrown to outside. Otherwise, the user cannot distinguish between empty state and illegal access. Same for other getWindowState impls. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/context/WindowStateStore.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.window.context; + +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.datastream.api.extension.window.context.WindowContext; +import org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator; +import org.apache.flink.runtime.state.v2.adaptor.ListStateAdaptor; +import org.apache.flink.runtime.state.v2.adaptor.MapStateAdaptor; +import org.apache.flink.runtime.state.v2.adaptor.ValueStateAdaptor; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This class provides methods to store and retrieve state associated with windows in {@link + * WindowContext}. + * + * @param <K> Type of the window key. + * @param <W> Type of the window. + */ +public class WindowStateStore<K, W extends Window> { + + private static final Logger LOG = LoggerFactory.getLogger(WindowStateStore.class); + + /** User-defined {@link WindowProcessFunction}. */ + private final WindowProcessFunction windowProcessFunction; + + /** + * The {@link StateDeclaration}s that have been declared by the {@link + * WindowProcessFunction#useWindowStates()}. + */ + private final Set<StateDeclaration> declaredWindowStateDeclaration; + + /** The operator to which the window belongs, used for creating and retrieving window state. */ + private final AbstractAsyncStateStreamOperator<?> operator; + + private final TypeSerializer<W> windowSerializer; + + /** Whether the window is a merging window. */ + private final boolean isMergingWindow; + + public WindowStateStore( + WindowProcessFunction windowProcessFunction, + AbstractAsyncStateStreamOperator<?> operator, + TypeSerializer<W> windowSerializer, + boolean isMergingWindow) { + this.windowProcessFunction = windowProcessFunction; + this.declaredWindowStateDeclaration = windowProcessFunction.useWindowStates(); + this.operator = operator; + this.windowSerializer = windowSerializer; + this.isMergingWindow = isMergingWindow; + } + + private boolean isStateDeclared(StateDeclaration stateDeclaration) { + if (!declaredWindowStateDeclaration.contains(stateDeclaration)) { + LOG.warn( + "Fail to get window state for " + + stateDeclaration.getName() + + ", please declare the used state in the `WindowProcessFunction#useWindowStates` method first."); + return false; + } + return true; + } + + private boolean stateRedistributionModeIsNone(StateDeclaration stateDeclaration) { + StateDeclaration.RedistributionMode redistributionMode = + stateDeclaration.getRedistributionMode(); + return redistributionMode == StateDeclaration.RedistributionMode.NONE; + } Review Comment: ```suggestion private boolean stateRedistributionModeIsNotNone(StateDeclaration stateDeclaration) { StateDeclaration.RedistributionMode redistributionMode = stateDeclaration.getRedistributionMode(); return redistributionMode != StateDeclaration.RedistributionMode.NONE; } ``` ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/context/WindowStateStore.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.window.context; + +import org.apache.flink.api.common.state.ListStateDeclaration; +import org.apache.flink.api.common.state.MapStateDeclaration; +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.MapStateDescriptor; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.state.v2.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.datastream.api.extension.window.context.WindowContext; +import org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator; +import org.apache.flink.runtime.state.v2.adaptor.ListStateAdaptor; +import org.apache.flink.runtime.state.v2.adaptor.MapStateAdaptor; +import org.apache.flink.runtime.state.v2.adaptor.ValueStateAdaptor; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This class provides methods to store and retrieve state associated with windows in {@link + * WindowContext}. + * + * @param <K> Type of the window key. + * @param <W> Type of the window. + */ +public class WindowStateStore<K, W extends Window> { + + private static final Logger LOG = LoggerFactory.getLogger(WindowStateStore.class); + + /** User-defined {@link WindowProcessFunction}. */ + private final WindowProcessFunction windowProcessFunction; + + /** + * The {@link StateDeclaration}s that have been declared by the {@link + * WindowProcessFunction#useWindowStates()}. + */ + private final Set<StateDeclaration> declaredWindowStateDeclaration; + + /** The operator to which the window belongs, used for creating and retrieving window state. */ + private final AbstractAsyncStateStreamOperator<?> operator; + + private final TypeSerializer<W> windowSerializer; + + /** Whether the window is a merging window. */ + private final boolean isMergingWindow; + + public WindowStateStore( + WindowProcessFunction windowProcessFunction, + AbstractAsyncStateStreamOperator<?> operator, + TypeSerializer<W> windowSerializer, + boolean isMergingWindow) { + this.windowProcessFunction = windowProcessFunction; + this.declaredWindowStateDeclaration = windowProcessFunction.useWindowStates(); + this.operator = operator; + this.windowSerializer = windowSerializer; + this.isMergingWindow = isMergingWindow; + } + + private boolean isStateDeclared(StateDeclaration stateDeclaration) { + if (!declaredWindowStateDeclaration.contains(stateDeclaration)) { + LOG.warn( + "Fail to get window state for " + + stateDeclaration.getName() + + ", please declare the used state in the `WindowProcessFunction#useWindowStates` method first."); + return false; + } + return true; + } + + private boolean stateRedistributionModeIsNone(StateDeclaration stateDeclaration) { + StateDeclaration.RedistributionMode redistributionMode = + stateDeclaration.getRedistributionMode(); + return redistributionMode == StateDeclaration.RedistributionMode.NONE; + } + + /** Retrieve window state of list type. */ + public <T> Optional<ListState<T>> getWindowState( + ListStateDeclaration<T> stateDeclaration, W namespace) { + checkState( + !isMergingWindow, + "Retrieving the window state is not permitted when using merging windows, such as session windows."); + + if (!isStateDeclared(stateDeclaration)) { + return Optional.empty(); + } + + if (!stateRedistributionModeIsNone(stateDeclaration)) { + throw new UnsupportedOperationException( + "RedistributionMode " + + stateDeclaration.getRedistributionMode().name() + + " is not supported for window state."); + } + + ListStateDescriptor<T> stateDescriptor = + new ListStateDescriptor<T>( + stateDeclaration.getName(), + TypeExtractor.createTypeInfo( + stateDeclaration.getTypeDescriptor().getTypeClass())); + + try { + ListStateAdaptor<K, W, T> state = + operator.getOrCreateKeyedState(namespace, windowSerializer, stateDescriptor); + state.setCurrentNamespace(namespace); + return Optional.of(state); + } catch (Exception e) { + return Optional.empty(); + } + } + + /** Retrieve window state of map type. */ + public <KEY, V> Optional<MapState<KEY, V>> getWindowState( + MapStateDeclaration<KEY, V> stateDeclaration, W namespace) { + checkState( + !isMergingWindow, + "Retrieving the window state is not permitted when using merging windows, such as session windows."); + + if (!isStateDeclared(stateDeclaration)) { + return Optional.empty(); + } + + if (!stateRedistributionModeIsNone(stateDeclaration)) { + throw new UnsupportedOperationException( + "RedistributionMode " + + stateDeclaration.getRedistributionMode().name() + + " is not supported for window state."); + } + + MapStateDescriptor<KEY, V> stateDescriptor = + new MapStateDescriptor<KEY, V>( + stateDeclaration.getName(), + TypeExtractor.createTypeInfo( + stateDeclaration.getKeyTypeDescriptor().getTypeClass()), + TypeExtractor.createTypeInfo( + stateDeclaration.getValueTypeDescriptor().getTypeClass())); + + try { + MapStateAdaptor<K, W, KEY, V> state = + operator.getOrCreateKeyedState(namespace, windowSerializer, stateDescriptor); + state.setCurrentNamespace(namespace); + return Optional.of(state); + } catch (Exception e) { + return Optional.empty(); + } + } + + /** Retrieve window state of value type. */ + public <T> Optional<ValueState<T>> getWindowState( + ValueStateDeclaration<T> stateDeclaration, W namespace) { + checkState( + !isMergingWindow, + "Retrieving the window state is not permitted when using merging windows, such as session windows."); + + if (!isStateDeclared(stateDeclaration)) { + return Optional.empty(); + } + + if (!stateRedistributionModeIsNone(stateDeclaration)) { + throw new UnsupportedOperationException( + "RedistributionMode " + + stateDeclaration.getRedistributionMode().name() + + " is not supported for window state."); + } + + ValueStateDescriptor<T> stateDescriptor = + new ValueStateDescriptor<T>( + stateDeclaration.getName(), + TypeExtractor.createTypeInfo( + stateDeclaration.getTypeDescriptor().getTypeClass())); + + try { + ValueStateAdaptor<K, W, T> state = + operator.getOrCreateKeyedState(namespace, windowSerializer, stateDescriptor); + state.setCurrentNamespace(namespace); + return Optional.of(state); + } catch (Exception e) { + return Optional.empty(); + } + } Review Comment: Miss Reduce and Agg state. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java: ########## @@ -118,16 +130,50 @@ public <OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> process( TypeInformation<OUT> outType; outType = StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType()); - KeyedProcessOperator<K, V, OUT> operator = new KeyedProcessOperator<>(processFunction); - Transformation<OUT> transform = - StreamUtils.getOneInputKeyedTransformation( - "KeyedProcess", this, outType, operator, keySelector, keyType); + Transformation<OUT> transform; + Configuration configuration = getEnvironment().getConfiguration(); + + if (processFunction instanceof InternalOneInputWindowStreamProcessFunction) { + transform = transformWindow(outType, processFunction); + } else { + KeyedProcessOperator<K, V, OUT> operator = new KeyedProcessOperator<>(processFunction); + transform = + StreamUtils.getOneInputKeyedTransformation( + "KeyedProcess", this, outType, operator, keySelector, keyType); + } transform.setAttribute(AttributeParser.parseAttribute(processFunction)); environment.addOperator(transform); return StreamUtils.wrapWithConfigureHandle( new NonKeyedPartitionStreamImpl<>(environment, transform)); } + public <OUT> Transformation<OUT> transformWindow( + TypeInformation<OUT> outType, OneInputStreamProcessFunction<V, OUT> processFunction) { + Transformation<OUT> transform; + if (processFunction instanceof InternalOneInputWindowStreamProcessFunction) { + InternalOneInputWindowStreamProcessFunction<V, OUT, ?> internalWindowFunction = + (InternalOneInputWindowStreamProcessFunction<V, OUT, ?>) processFunction; + WindowAssigner<V, ?> assigner = internalWindowFunction.getAssigner(); + ListStateDescriptor<V> stateDesc = Review Comment: This `ListStateDescriptor` is deprecated. I think the one we needed is `org.apache.flink.api.common.state.v2.ListStateDescriptor` instead of `org.apache.flink.runtime.state.v2.ListStateDescriptor`. Please double check this and also check for all other type of descriptors. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/utils/WindowUtils.java: ########## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.window.utils; + +import org.apache.flink.datastream.api.extension.window.strategy.GlobalWindowStrategy; +import org.apache.flink.datastream.api.extension.window.strategy.SessionWindowStrategy; +import org.apache.flink.datastream.api.extension.window.strategy.SlidingTimeWindowStrategy; +import org.apache.flink.datastream.api.extension.window.strategy.TumblingTimeWindowStrategy; +import org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy; +import org.apache.flink.datastream.impl.extension.window.context.WindowTriggerContext; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** Some utils used in Window extension. */ +public class WindowUtils { Review Comment: ```suggestion public final class WindowUtils { ``` ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/function/InternalOneInputWindowStreamProcessFunction.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.window.function; + +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction; +import org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction; +import org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.Window; + +import java.util.Set; + +/** + * A class that wrap a {@link OneInputWindowStreamProcessFunction} to internal processing. Review Comment: ```suggestion * A class that wrap a {@link OneInputWindowStreamProcessFunction} to process function. This will be translated to a window operator instead of vanilla process operator. ``` Please also updated for other variants of `InternalXXXWindowStreamProcessFunction`. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/utils/TaggedUnion.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.window.utils; + +import org.apache.flink.annotation.Internal; + +import java.util.Objects; + +/** Util class for implementing two input window operation. */ +@Internal +public class TaggedUnion<T1, T2> { Review Comment: Will the data that goes into the two input window really be converted to `TaggedUnion`? What is our purpose in introducing this class. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java: ########## @@ -73,9 +80,27 @@ public <OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> process( TypeInformation<OUT> outType = StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType()); - ProcessOperator<T, OUT> operator = new ProcessOperator<>(processFunction); - OneInputTransformation<T, OUT> outputTransform = - StreamUtils.getOneInputTransformation("Process", this, outType, operator); + OneInputTransformation<T, OUT> outputTransform; Review Comment: I think for methods which doesn't support window, for example: ```public <T_OTHER, OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> connectAndProcess( BroadcastStream<T_OTHER> other, TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction) ``` we should check the `TwoInputBroadcastStreamProcessFunction` is not `InternalTwoInputNonBroadcastWindowStreamProcessFunction`, otherwise it may produce unexpected behavior as it is a valid `TwoInputNonBroadcastStreamProcessFunction` but have dummy `processRecord` impl. Please also check this for other stream. ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java: ########## @@ -118,16 +130,50 @@ public <OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> process( TypeInformation<OUT> outType; outType = StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType()); - KeyedProcessOperator<K, V, OUT> operator = new KeyedProcessOperator<>(processFunction); - Transformation<OUT> transform = - StreamUtils.getOneInputKeyedTransformation( - "KeyedProcess", this, outType, operator, keySelector, keyType); + Transformation<OUT> transform; + Configuration configuration = getEnvironment().getConfiguration(); Review Comment: ? ########## flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/operators/OneInputWindowProcessOperator.java: ########## @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.extension.window.operators; + +import org.apache.flink.api.common.state.v2.AppendingState; +import org.apache.flink.api.common.state.v2.ListStateDescriptor; +import org.apache.flink.api.common.state.v2.StateDescriptor; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction; +import org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; +import org.apache.flink.datastream.impl.extension.window.context.DefaultOneInputWindowContext; +import org.apache.flink.datastream.impl.extension.window.context.WindowTriggerContext; +import org.apache.flink.datastream.impl.extension.window.function.InternalOneInputWindowStreamProcessFunction; +import org.apache.flink.datastream.impl.extension.window.utils.WindowUtils; +import org.apache.flink.datastream.impl.operators.BaseKeyedProcessOperator; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.v2.internal.InternalAppendingState; +import org.apache.flink.runtime.state.v2.internal.InternalListState; +import org.apache.flink.runtime.state.v2.internal.InternalMergingState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.Collection; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Operator for {@link OneInputWindowStreamProcessFunction} in {@link KeyedPartitionStream}. */ +public class OneInputWindowProcessOperator<K, IN, OUT, W extends Window> Review Comment: I suggest that we abstracting common logic from all `XXXWindowOperator` to reuse code. Considering that DS V2 currently in the experimental stage and code freeze time is approaching. It's fair enough to do these refactor in the next release. Further, we can also do this for V1's window operator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org