reswqa commented on code in PR #26001:
URL: https://github.com/apache/flink/pull/26001#discussion_r1919639049


##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoInputNonNroadcastWindowStreamProcessFunction.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.extension.window.context.TwoInputWindowContext;
+
+/**
+ * A type of {@link WindowProcessFunction} that targets two input windows, 
such as in a join
+ * operation.
+ *
+ * @param <IN1> The type of the input1 value.
+ * @param <IN2> The type of the input2 value.
+ * @param <OUT> The type of the output value.
+ */
+@Experimental
+public interface TwoInputNonNroadcastWindowStreamProcessFunction<IN1, IN2, OUT>
+        extends WindowProcessFunction {
+
+    /**
+     * The {@link #onRecord1} method will be invoked when a record is received 
from input1. Its

Review Comment:
   We should avoid link to self in java doc.
   
   Can be replaced by `This method`. Please also check this PR for other 
similar issue.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoInputNonNroadcastWindowStreamProcessFunction.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.extension.window.context.TwoInputWindowContext;
+
+/**
+ * A type of {@link WindowProcessFunction} that targets two input windows, 
such as in a join
+ * operation.
+ *
+ * @param <IN1> The type of the input1 value.
+ * @param <IN2> The type of the input2 value.
+ * @param <OUT> The type of the output value.
+ */
+@Experimental
+public interface TwoInputNonNroadcastWindowStreamProcessFunction<IN1, IN2, OUT>

Review Comment:
   ```suggestion
   public interface TwoInputNonBroadcastWindowStreamProcessFunction<IN1, IN2, 
OUT>
   ```
   
   :)



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/WindowProcessFunction.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.datastream.api.function.ProcessFunction;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Base interface for functions evaluated over windows, providing callback 
functions for various
+ * stages of the window's lifecycle.
+ */
+@Experimental
+public interface WindowProcessFunction extends ProcessFunction {
+
+    /**
+     * Explicitly declares states that are bound to the window upfront. Each 
specific window state

Review Comment:
   ```suggestion
        * Explicitly declares states that are bound to the window. Each 
specific window state
   ```



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoInputNonNroadcastWindowStreamProcessFunction.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.extension.window.context.TwoInputWindowContext;
+
+/**
+ * A type of {@link WindowProcessFunction} that targets two input windows, 
such as in a join
+ * operation.
+ *

Review Comment:
   ```suggestion
   /**
    * A type of {@link WindowProcessFunction} for two input window processing, 
such as window-join.
    *
   ```



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/** The WindowStrategy defines how to generate Window in the stream. */
+@Experimental
+public class WindowStrategy implements Serializable {
+
+    public static final TimeType PROCESSING_TIME = TimeType.PROCESSING;
+    public static final TimeType EVENT_TIME = TimeType.EVENT;
+
+    /** The types of time used in window operations. */
+    @Experimental
+    public enum TimeType {
+        PROCESSING,
+        EVENT
+    }
+
+    // ============== global window ================
+
+    /**
+     * Creates a global window strategy. Note that the global window can be 
used in both
+     * GlobalStream, KeyedStream, NonKeyedStream.
+     *
+     * @return A global window strategy.
+     */
+    public static WindowStrategy global() {
+        return new GlobalWindowStrategy();
+    }
+
+    // ============== tumbling time window ================
+
+    /**
+     * Create a tumbling time window strategy with the event time default time 
type. Note that
+     * tumbling time windows can only be used in KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize) {
+        return new TumblingTimeWindowStrategy(windowSize);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize, TimeType 
timeType) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @param allowedLateness the allowed lateness of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(
+            Duration windowSize, TimeType timeType, Duration allowedLateness) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType, 
allowedLateness);
+    }
+
+    // ============== sliding time window ================
+
+    /**
+     * Create a sliding time window strategy with the event time default time 
type. Note that
+     * sliding time windows can only be used in KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param windowSlideInterval the slide interval of Window.
+     * @return A sliding time window strategy.
+     */
+    public static WindowStrategy sliding(Duration windowSize, Duration 
windowSlideInterval) {
+        return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval);
+    }
+
+    /**
+     * Create a sliding time window strategy. Note that sliding time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param windowSlideInterval the slide interval of Window.
+     * @param timeType the time type of Window.
+     * @return A sliding time window strategy.
+     */
+    public static WindowStrategy sliding(
+            Duration windowSize, Duration windowSlideInterval, TimeType 
timeType) {
+        return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval, 
timeType);
+    }
+
+    /**
+     * Create a sliding time window strategy. Note that sliding time windows 
can only be used in
+     * KeyedStream.

Review Comment:
   How about GlobalStream?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/OneInputWindowContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.context;
+
+import org.apache.flink.annotation.Experimental;
+
+/**
+ * The {@link OneInputWindowContext} interface extends {@link WindowContext} 
and provides additional
+ * functionality for writing and reading window data of one input window.
+ *
+ * @param <IN> Type of the input elements
+ */
+@Experimental
+public interface OneInputWindowContext<IN> extends WindowContext {
+
+    /** Write records into the window's state. */
+    void putRecord(IN record);
+
+    /**
+     * Read records from the window's state, note that this cloud be null if 
the window is empty.

Review Comment:
   which cloud? Alibaba Cloud? ☁️ ☁️ ☁️ 😉 
   
   please also check `TwoInputWindowContext`.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SlidingTimeWindowStrategy.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.time.Duration;
+
+/** A {@link WindowStrategy} used to generate sliding TimeWindow. */
+@Experimental
+public class SlidingTimeWindowStrategy extends WindowStrategy {
+    private Duration windowSize;
+    private Duration windowSlideInterval;
+    private TimeType timeType;
+    private Duration allowedLateness;

Review Comment:
   Can be final.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/context/WindowStateStore.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.window.context;
+
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.datastream.api.extension.window.context.WindowContext;
+import 
org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
+import org.apache.flink.runtime.state.v2.adaptor.ListStateAdaptor;
+import org.apache.flink.runtime.state.v2.adaptor.MapStateAdaptor;
+import org.apache.flink.runtime.state.v2.adaptor.ValueStateAdaptor;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class provides methods to store and retrieve state associated with 
windows in {@link
+ * WindowContext}.
+ *
+ * @param <K> Type of the window key.
+ * @param <W> Type of the window.
+ */
+public class WindowStateStore<K, W extends Window> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(WindowStateStore.class);
+
+    /** User-defined {@link WindowProcessFunction}. */
+    private final WindowProcessFunction windowProcessFunction;
+
+    /**
+     * The {@link StateDeclaration}s that have been declared by the {@link
+     * WindowProcessFunction#useWindowStates()}.
+     */
+    private final Set<StateDeclaration> declaredWindowStateDeclaration;

Review Comment:
   ```suggestion
       private final Set<StateDeclaration> windowStateDeclarations;
   ```



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/function/InternalOneInputWindowStreamProcessFunction.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.window.function;
+
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction;
+import 
org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import java.util.Set;
+
+/**
+ * A class that wrap a {@link OneInputWindowStreamProcessFunction} to internal 
processing.
+ *
+ * @param <IN> Type of the input elements.
+ * @param <OUT> Type of the output elements.
+ * @param <W> Type of the window.
+ */
+public class InternalOneInputWindowStreamProcessFunction<IN, OUT, W extends 
Window>
+        implements OneInputStreamProcessFunction<IN, OUT> {
+
+    /** User-defined {@link WindowProcessFunction}. */
+    private final OneInputWindowStreamProcessFunction<IN, OUT> 
windowProcessFunction;
+
+    private final WindowAssigner<IN, W> assigner;
+
+    private final Trigger<IN, W> trigger;
+
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to 
lateness.
+     *   <li>Clearing the state of a window if the system time passes the 
{@code window.maxTimestamp

Review Comment:
   ```suggestion
        *   <li>Clearing the state of a window if the time out-of the {@code 
window.maxTimestamp
   ```



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/context/WindowTriggerContext.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.flink.datastream.impl.extension.window.context;
+
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collection;
+
+/**
+ * {@code WindowTriggerContext} is a utility for handling {@code Trigger} 
invocations. It can be
+ * reused by setting the {@code key} and {@code window} fields. No internal 
state must be kept in
+ * the {@code WindowTriggerContext}

Review Comment:
   What's the meaning of `No internal state must be kept in the {@code 
WindowTriggerContext}`?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.context;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.ValueState;
+
+import java.util.Optional;
+
+/**
+ * The {@link WindowContext} interface represents a context for window 
operations and provides
+ * methods to interact with state that is scoped to the window.
+ */
+@Experimental
+public interface WindowContext {
+
+    /**
+     * Gets the starting timestamp of the window. This is the first timestamp 
that belongs to this
+     * window.
+     *
+     * @return The starting timestamp of this window, or -1 if the window is 
not a time window or a
+     *     session window.
+     */
+    long getStartTime();
+
+    /**
+     * Gets the end timestamp of this window. The end timestamp is exclusive, 
meaning it is the
+     * first timestamp that does not belong to this window anymore.
+     *
+     * @return The exclusive end timestamp of this window, or -1 if the window 
is not a time window
+     *     or a session window.

Review Comment:
   ```suggestion
        * @return The exclusive end timestamp of this window, or -1 if the 
window is
        *     a session window or not a time window.
   ```



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.context;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.ValueState;
+
+import java.util.Optional;
+
+/**
+ * The {@link WindowContext} interface represents a context for window 
operations and provides
+ * methods to interact with state that is scoped to the window.
+ */
+@Experimental
+public interface WindowContext {
+
+    /**
+     * Gets the starting timestamp of the window. This is the first timestamp 
that belongs to this
+     * window.
+     *
+     * @return The starting timestamp of this window, or -1 if the window is 
not a time window or a
+     *     session window.

Review Comment:
   ```suggestion
        * @return The starting timestamp of this window, or -1 if the window is 
a
        *     session window or not a time window.
   ```



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/TwoOutputWindowStreamProcessFunction.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.TwoOutputPartitionedContext;
+import 
org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext;
+
+/**
+ * A type of {@link WindowProcessFunction} that targets two output windows.

Review Comment:
   ```suggestion
    * A type of {@link WindowProcessFunction} for two-output window processing.
   ```



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/builtin/BuiltinFuncs.java:
##########
@@ -121,4 +127,89 @@ public static <KEY, T1, T2, OUT> 
NonKeyedPartitionStream<OUT> join(
                 joinFunction,
                 joinType);
     }
+
+    // =================== Window ===========================
+
+    static final Class<?> WINDOW_FUNCS_INSTANCE;
+
+    static {
+        try {
+            WINDOW_FUNCS_INSTANCE =
+                    
Class.forName("org.apache.flink.datastream.impl.builtin.BuiltinWindowFuncs");
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Please ensure that flink-datastream in 
your class path");
+        }
+    }
+
+    /**
+     * Wrap the WindowStrategy and OneInputWindowStreamProcessFunction within a
+     * OneInputStreamProcessFunction to perform the window operation.
+     *
+     * @param windowStrategy the window strategy
+     * @param windowProcessFunction the window process function
+     * @return the wrapped process function
+     */
+    public static <IN, OUT> OneInputStreamProcessFunction<IN, OUT> window(
+            WindowStrategy windowStrategy,
+            OneInputWindowStreamProcessFunction<IN, OUT> 
windowProcessFunction) {
+        try {
+            return (OneInputStreamProcessFunction<IN, OUT>)
+                    WINDOW_FUNCS_INSTANCE
+                            .getMethod(
+                                    "window",
+                                    WindowStrategy.class,
+                                    OneInputWindowStreamProcessFunction.class)
+                            .invoke(null, windowStrategy, 
windowProcessFunction);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Wrap the WindowStrategy and 
TwoInputNonNroadcastWindowStreamProcessFunction within a

Review Comment:
   ```suggestion
        * Wrap the WindowStrategy and 
TwoInputNonBroadcastWindowStreamProcessFunction within a
   ```



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/OneInputWindowStreamProcessFunction.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext;
+
+/**
+ * A type of {@link WindowProcessFunction} that targets one input windows.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ */
+@Experimental
+public interface OneInputWindowStreamProcessFunction<IN, OUT> extends 
WindowProcessFunction {
+
+    /**
+     * The {@link #onRecord} method will be invoked when a record is received. 
Its default behavior
+     * is to store data in built-in window state by {@code 
WindowContext#putRecord}. If the user
+     * overrides this method, they will need to update the window state as 
necessary.

Review Comment:
   ```suggestion
        * The {@link #onRecord} method will be invoked when a record is 
received. Its default behavior
        * is to store data in built-in window state by {@link 
OneInputWindowContext#putRecord}. If the user
        * overrides this method, they have to take care of the input data 
themselves.
   ```
   
   Same for other `XXXWindowStreamProcessFunction`.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/function/OneInputWindowStreamProcessFunction.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext;
+
+/**
+ * A type of {@link WindowProcessFunction} that targets one input windows.

Review Comment:
   ```suggestion
    * A type of {@link WindowProcessFunction} for one-input window processing.
   ```



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java:
##########
@@ -70,13 +74,26 @@ public <OUT> ProcessConfigurableAndGlobalStream<OUT> 
process(
 
         TypeInformation<OUT> outType =
                 
StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType());
-        ProcessOperator<T, OUT> operator = new 
ProcessOperator<>(processFunction);
-        return StreamUtils.wrapWithConfigureHandle(
-                transform(
-                        "Global Process",
-                        outType,
-                        operator,
-                        AttributeParser.parseAttribute(processFunction)));
+
+        if (processFunction instanceof 
InternalOneInputWindowStreamProcessFunction) {
+            // Transform to keyed stream.
+            KeyedPartitionStreamImpl<Byte, T> keyedStream =

Review Comment:
   Why we construct keyed stream first and then convert it to global stream? 
   
   It feels sort of wired, if you just want to use the transformWindow method 
on keyed stream, why not extract it into the StreamUtils?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/** The WindowStrategy defines how to generate Window in the stream. */
+@Experimental
+public class WindowStrategy implements Serializable {
+
+    public static final TimeType PROCESSING_TIME = TimeType.PROCESSING;
+    public static final TimeType EVENT_TIME = TimeType.EVENT;
+
+    /** The types of time used in window operations. */
+    @Experimental
+    public enum TimeType {
+        PROCESSING,
+        EVENT
+    }
+
+    // ============== global window ================
+
+    /**
+     * Creates a global window strategy. Note that the global window can be 
used in both
+     * GlobalStream, KeyedStream, NonKeyedStream.
+     *
+     * @return A global window strategy.
+     */
+    public static WindowStrategy global() {
+        return new GlobalWindowStrategy();
+    }
+
+    // ============== tumbling time window ================
+
+    /**
+     * Create a tumbling time window strategy with the event time default time 
type. Note that
+     * tumbling time windows can only be used in KeyedStream.

Review Comment:
   How about GlobalStream?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.context;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.ValueState;
+
+import java.util.Optional;
+
+/**
+ * The {@link WindowContext} interface represents a context for window 
operations and provides
+ * methods to interact with state that is scoped to the window.
+ */
+@Experimental
+public interface WindowContext {
+
+    /**
+     * Gets the starting timestamp of the window. This is the first timestamp 
that belongs to this
+     * window.
+     *
+     * @return The starting timestamp of this window, or -1 if the window is 
not a time window or a
+     *     session window.
+     */
+    long getStartTime();
+
+    /**
+     * Gets the end timestamp of this window. The end timestamp is exclusive, 
meaning it is the
+     * first timestamp that does not belong to this window anymore.
+     *
+     * @return The exclusive end timestamp of this window, or -1 if the window 
is not a time window
+     *     or a session window.
+     */
+    long getEndTime();
+
+    /**
+     * Retrieves a {@link ListState} object that can be used to interact with 
fault-tolerant state
+     * that is scoped to the window and key of the current trigger invocation.

Review Comment:
   ```suggestion
        * Retrieves a {@link ListState} object that can be used to interact 
with fault-tolerant state
        * that is scoped to the window.
   ```



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java:
##########
@@ -247,6 +299,50 @@ public <OUT1, OUT2> 
ProcessConfigurableAndTwoNonKeyedPartitionStream<OUT1, OUT2>
                 environment, firstTransformation, firstStream, secondStream);
     }
 
+    public <OUT1, OUT2> Transformation<OUT1> transformTwoOutputWindow(
+            Tuple2<TypeInformation<OUT1>, TypeInformation<OUT2>> twoOutputType,
+            OutputTag<OUT2> secondOutputTag,
+            TwoOutputStreamProcessFunction<V, OUT1, OUT2> processFunction) {
+        Transformation<OUT1> transform;
+        TypeInformation<OUT1> firstOutputType = twoOutputType.f0;
+        TypeInformation<OUT2> secondOutputType = twoOutputType.f1;
+        if (processFunction instanceof 
InternalTwoOutputWindowStreamProcessFunction) {
+            InternalTwoOutputWindowStreamProcessFunction<V, OUT1, OUT2, ?> 
internalWindowFunction =
+                    (InternalTwoOutputWindowStreamProcessFunction<V, OUT1, 
OUT2, ?>)
+                            processFunction;
+            WindowAssigner<V, ?> assigner = 
internalWindowFunction.getAssigner();
+            ListStateDescriptor<V> stateDesc =
+                    new ListStateDescriptor<>(
+                            "window-iterator-state",
+                            
TypeExtractor.createTypeInfo(getType().getTypeClass()));
+
+            TwoOutputWindowProcessOperator windowProcessOperator =
+                    new TwoOutputWindowProcessOperator(
+                            internalWindowFunction,
+                            secondOutputTag,
+                            null,
+                            null,
+                            assigner,
+                            internalWindowFunction.getTrigger(),
+                            
assigner.getWindowSerializer(environment.getExecutionConfig()),
+                            stateDesc,
+                            internalWindowFunction.getAllowedLateness());
+            transform =
+                    StreamUtils.getOneInputKeyedTransformation(
+                            "Two-Output-Process",
+                            this,
+                            firstOutputType,
+                            windowProcessOperator,
+                            keySelector,
+                            keyType);
+            //            transform = oneInputTransformWithOperator("Window", 
firstOutputType,
+            // windowProcessOperator);

Review Comment:
   ?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.context;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.ValueState;
+
+import java.util.Optional;
+
+/**
+ * The {@link WindowContext} interface represents a context for window 
operations and provides
+ * methods to interact with state that is scoped to the window.
+ */
+@Experimental
+public interface WindowContext {
+
+    /**
+     * Gets the starting timestamp of the window. This is the first timestamp 
that belongs to this
+     * window.
+     *
+     * @return The starting timestamp of this window, or -1 if the window is 
not a time window or a
+     *     session window.
+     */
+    long getStartTime();
+
+    /**
+     * Gets the end timestamp of this window. The end timestamp is exclusive, 
meaning it is the
+     * first timestamp that does not belong to this window anymore.
+     *
+     * @return The exclusive end timestamp of this window, or -1 if the window 
is not a time window
+     *     or a session window.
+     */
+    long getEndTime();
+
+    /**
+     * Retrieves a {@link ListState} object that can be used to interact with 
fault-tolerant state
+     * that is scoped to the window and key of the current trigger invocation.
+     */
+    <T> Optional<ListState<T>> getWindowState(ListStateDeclaration<T> 
stateDeclaration)
+            throws Exception;
+
+    /**
+     * Retrieves a {@link MapState} object that can be used to interact with 
fault-tolerant state
+     * that is scoped to the window and key of the current trigger invocation.

Review Comment:
   ```suggestion
        * Retrieves a {@link MapState} object that can be used to interact with 
fault-tolerant state
        * that is scoped to the window.
   ```



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java:
##########
@@ -424,6 +563,31 @@ public <T_OTHER, OUT> Transformation<OUT> 
getJoinTransformation(
                 joinProcessOperator);
     }
 
+    private <R> Transformation<R> oneInputTransformWithOperator(

Review Comment:
   Can this be replaced by `StreamUtils.getOneInputKeyedTransformation`?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/** The WindowStrategy defines how to generate Window in the stream. */
+@Experimental
+public class WindowStrategy implements Serializable {
+
+    public static final TimeType PROCESSING_TIME = TimeType.PROCESSING;
+    public static final TimeType EVENT_TIME = TimeType.EVENT;
+
+    /** The types of time used in window operations. */
+    @Experimental
+    public enum TimeType {
+        PROCESSING,
+        EVENT
+    }
+
+    // ============== global window ================
+
+    /**
+     * Creates a global window strategy. Note that the global window can be 
used in both
+     * GlobalStream, KeyedStream, NonKeyedStream.
+     *
+     * @return A global window strategy.
+     */
+    public static WindowStrategy global() {
+        return new GlobalWindowStrategy();
+    }
+
+    // ============== tumbling time window ================
+
+    /**
+     * Create a tumbling time window strategy with the event time default time 
type. Note that
+     * tumbling time windows can only be used in KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize) {
+        return new TumblingTimeWindowStrategy(windowSize);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize, TimeType 
timeType) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @param allowedLateness the allowed lateness of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(
+            Duration windowSize, TimeType timeType, Duration allowedLateness) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType, 
allowedLateness);
+    }
+
+    // ============== sliding time window ================
+
+    /**
+     * Create a sliding time window strategy with the event time default time 
type. Note that
+     * sliding time windows can only be used in KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param windowSlideInterval the slide interval of Window.
+     * @return A sliding time window strategy.
+     */
+    public static WindowStrategy sliding(Duration windowSize, Duration 
windowSlideInterval) {
+        return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval);
+    }
+
+    /**
+     * Create a sliding time window strategy. Note that sliding time windows 
can only be used in
+     * KeyedStream.

Review Comment:
   How about GlobalStream?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.context;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.ValueState;
+
+import java.util.Optional;
+
+/**
+ * The {@link WindowContext} interface represents a context for window 
operations and provides
+ * methods to interact with state that is scoped to the window.
+ */
+@Experimental
+public interface WindowContext {
+
+    /**
+     * Gets the starting timestamp of the window. This is the first timestamp 
that belongs to this
+     * window.
+     *
+     * @return The starting timestamp of this window, or -1 if the window is 
not a time window or a
+     *     session window.
+     */
+    long getStartTime();
+
+    /**
+     * Gets the end timestamp of this window. The end timestamp is exclusive, 
meaning it is the
+     * first timestamp that does not belong to this window anymore.
+     *
+     * @return The exclusive end timestamp of this window, or -1 if the window 
is not a time window
+     *     or a session window.
+     */
+    long getEndTime();
+
+    /**
+     * Retrieves a {@link ListState} object that can be used to interact with 
fault-tolerant state
+     * that is scoped to the window and key of the current trigger invocation.
+     */
+    <T> Optional<ListState<T>> getWindowState(ListStateDeclaration<T> 
stateDeclaration)
+            throws Exception;
+
+    /**
+     * Retrieves a {@link MapState} object that can be used to interact with 
fault-tolerant state
+     * that is scoped to the window and key of the current trigger invocation.
+     */
+    <KEY, V> Optional<MapState<KEY, V>> 
getWindowState(MapStateDeclaration<KEY, V> stateDeclaration)
+            throws Exception;
+
+    /**
+     * Retrieves a {@link ValueState} object that can be used to interact with 
fault-tolerant state
+     * that is scoped to the window and key of the current trigger invocation.

Review Comment:
   ```suggestion
        * Retrieves a {@link ValueState} object that can be used to interact 
with fault-tolerant state
        * that is scoped to the window.
   ```



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/context/WindowStateStore.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.window.context;
+
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.datastream.api.extension.window.context.WindowContext;
+import 
org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
+import org.apache.flink.runtime.state.v2.adaptor.ListStateAdaptor;
+import org.apache.flink.runtime.state.v2.adaptor.MapStateAdaptor;
+import org.apache.flink.runtime.state.v2.adaptor.ValueStateAdaptor;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class provides methods to store and retrieve state associated with 
windows in {@link
+ * WindowContext}.
+ *
+ * @param <K> Type of the window key.
+ * @param <W> Type of the window.
+ */
+public class WindowStateStore<K, W extends Window> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(WindowStateStore.class);
+
+    /** User-defined {@link WindowProcessFunction}. */
+    private final WindowProcessFunction windowProcessFunction;
+
+    /**
+     * The {@link StateDeclaration}s that have been declared by the {@link
+     * WindowProcessFunction#useWindowStates()}.
+     */
+    private final Set<StateDeclaration> declaredWindowStateDeclaration;
+
+    /** The operator to which the window belongs, used for creating and 
retrieving window state. */
+    private final AbstractAsyncStateStreamOperator<?> operator;
+
+    private final TypeSerializer<W> windowSerializer;
+
+    /** Whether the window is a merging window. */
+    private final boolean isMergingWindow;
+
+    public WindowStateStore(
+            WindowProcessFunction windowProcessFunction,
+            AbstractAsyncStateStreamOperator<?> operator,
+            TypeSerializer<W> windowSerializer,
+            boolean isMergingWindow) {
+        this.windowProcessFunction = windowProcessFunction;
+        this.declaredWindowStateDeclaration = 
windowProcessFunction.useWindowStates();
+        this.operator = operator;
+        this.windowSerializer = windowSerializer;
+        this.isMergingWindow = isMergingWindow;
+    }
+
+    private boolean isStateDeclared(StateDeclaration stateDeclaration) {
+        if (!declaredWindowStateDeclaration.contains(stateDeclaration)) {
+            LOG.warn(
+                    "Fail to get window state for "
+                            + stateDeclaration.getName()
+                            + ", please declare the used state in the 
`WindowProcessFunction#useWindowStates` method first.");
+            return false;
+        }
+        return true;
+    }
+
+    private boolean stateRedistributionModeIsNone(StateDeclaration 
stateDeclaration) {
+        StateDeclaration.RedistributionMode redistributionMode =
+                stateDeclaration.getRedistributionMode();
+        return redistributionMode == StateDeclaration.RedistributionMode.NONE;
+    }

Review Comment:
   I think we should invert this method's semantic because we always check its 
inverse.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/** The WindowStrategy defines how to generate Window in the stream. */
+@Experimental
+public class WindowStrategy implements Serializable {
+
+    public static final TimeType PROCESSING_TIME = TimeType.PROCESSING;
+    public static final TimeType EVENT_TIME = TimeType.EVENT;
+
+    /** The types of time used in window operations. */
+    @Experimental
+    public enum TimeType {
+        PROCESSING,
+        EVENT
+    }
+
+    // ============== global window ================
+
+    /**
+     * Creates a global window strategy. Note that the global window can be 
used in both
+     * GlobalStream, KeyedStream, NonKeyedStream.
+     *
+     * @return A global window strategy.
+     */
+    public static WindowStrategy global() {
+        return new GlobalWindowStrategy();
+    }
+
+    // ============== tumbling time window ================
+
+    /**
+     * Create a tumbling time window strategy with the event time default time 
type. Note that
+     * tumbling time windows can only be used in KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize) {
+        return new TumblingTimeWindowStrategy(windowSize);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.

Review Comment:
   How about GlobalStream?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/context/WindowContext.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.context;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.ValueState;
+
+import java.util.Optional;
+
+/**
+ * The {@link WindowContext} interface represents a context for window 
operations and provides
+ * methods to interact with state that is scoped to the window.
+ */
+@Experimental
+public interface WindowContext {
+
+    /**
+     * Gets the starting timestamp of the window. This is the first timestamp 
that belongs to this
+     * window.
+     *
+     * @return The starting timestamp of this window, or -1 if the window is 
not a time window or a
+     *     session window.
+     */
+    long getStartTime();
+
+    /**
+     * Gets the end timestamp of this window. The end timestamp is exclusive, 
meaning it is the
+     * first timestamp that does not belong to this window anymore.
+     *
+     * @return The exclusive end timestamp of this window, or -1 if the window 
is not a time window
+     *     or a session window.
+     */
+    long getEndTime();
+
+    /**
+     * Retrieves a {@link ListState} object that can be used to interact with 
fault-tolerant state
+     * that is scoped to the window and key of the current trigger invocation.
+     */
+    <T> Optional<ListState<T>> getWindowState(ListStateDeclaration<T> 
stateDeclaration)
+            throws Exception;
+
+    /**
+     * Retrieves a {@link MapState} object that can be used to interact with 
fault-tolerant state
+     * that is scoped to the window and key of the current trigger invocation.
+     */
+    <KEY, V> Optional<MapState<KEY, V>> 
getWindowState(MapStateDeclaration<KEY, V> stateDeclaration)
+            throws Exception;
+
+    /**
+     * Retrieves a {@link ValueState} object that can be used to interact with 
fault-tolerant state
+     * that is scoped to the window and key of the current trigger invocation.
+     */
+    <T> Optional<ValueState<T>> getWindowState(ValueStateDeclaration<T> 
stateDeclaration)
+            throws Exception;
+}

Review Comment:
   It seems that `ReducingStateDeclaration` and `AggregatingStateDeclaration` 
was missed.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/** The WindowStrategy defines how to generate Window in the stream. */

Review Comment:
   The doc here is not precise enough. IMO, WindowStrategy describes what kind 
of Windows to use, including strategies for dividing, triggering, and clearing 
Windows.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/** The WindowStrategy defines how to generate Window in the stream. */
+@Experimental
+public class WindowStrategy implements Serializable {
+
+    public static final TimeType PROCESSING_TIME = TimeType.PROCESSING;
+    public static final TimeType EVENT_TIME = TimeType.EVENT;
+
+    /** The types of time used in window operations. */
+    @Experimental
+    public enum TimeType {
+        PROCESSING,
+        EVENT
+    }
+
+    // ============== global window ================
+
+    /**
+     * Creates a global window strategy. Note that the global window can be 
used in both
+     * GlobalStream, KeyedStream, NonKeyedStream.
+     *
+     * @return A global window strategy.
+     */
+    public static WindowStrategy global() {
+        return new GlobalWindowStrategy();
+    }
+
+    // ============== tumbling time window ================
+
+    /**
+     * Create a tumbling time window strategy with the event time default time 
type. Note that
+     * tumbling time windows can only be used in KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize) {
+        return new TumblingTimeWindowStrategy(windowSize);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize, TimeType 
timeType) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.

Review Comment:
   How about GlobalStream?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/** The WindowStrategy defines how to generate Window in the stream. */
+@Experimental
+public class WindowStrategy implements Serializable {
+
+    public static final TimeType PROCESSING_TIME = TimeType.PROCESSING;
+    public static final TimeType EVENT_TIME = TimeType.EVENT;
+
+    /** The types of time used in window operations. */
+    @Experimental
+    public enum TimeType {
+        PROCESSING,
+        EVENT
+    }
+
+    // ============== global window ================
+
+    /**
+     * Creates a global window strategy. Note that the global window can be 
used in both
+     * GlobalStream, KeyedStream, NonKeyedStream.
+     *
+     * @return A global window strategy.
+     */
+    public static WindowStrategy global() {
+        return new GlobalWindowStrategy();
+    }
+
+    // ============== tumbling time window ================
+
+    /**
+     * Create a tumbling time window strategy with the event time default time 
type. Note that
+     * tumbling time windows can only be used in KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize) {
+        return new TumblingTimeWindowStrategy(windowSize);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize, TimeType 
timeType) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @param allowedLateness the allowed lateness of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(
+            Duration windowSize, TimeType timeType, Duration allowedLateness) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType, 
allowedLateness);
+    }
+
+    // ============== sliding time window ================
+
+    /**
+     * Create a sliding time window strategy with the event time default time 
type. Note that
+     * sliding time windows can only be used in KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param windowSlideInterval the slide interval of Window.
+     * @return A sliding time window strategy.
+     */
+    public static WindowStrategy sliding(Duration windowSize, Duration 
windowSlideInterval) {
+        return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval);
+    }
+
+    /**
+     * Create a sliding time window strategy. Note that sliding time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param windowSlideInterval the slide interval of Window.
+     * @param timeType the time type of Window.
+     * @return A sliding time window strategy.
+     */
+    public static WindowStrategy sliding(
+            Duration windowSize, Duration windowSlideInterval, TimeType 
timeType) {
+        return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval, 
timeType);
+    }
+
+    /**
+     * Create a sliding time window strategy. Note that sliding time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param windowSlideInterval the slide interval of Window.
+     * @param timeType the time type of Window.
+     * @param allowedLateness the allowed lateness of Window.
+     * @return A sliding time window strategy.
+     */
+    public static WindowStrategy sliding(
+            Duration windowSize,
+            Duration windowSlideInterval,
+            TimeType timeType,
+            Duration allowedLateness) {
+        return new SlidingTimeWindowStrategy(
+                windowSize, windowSlideInterval, timeType, allowedLateness);
+    }
+
+    // ============== session window ================
+
+    /**
+     * Create a session time window strategy with the event time default time 
type. Note that
+     * session time windows can only be used in KeyedStream.

Review Comment:
   How about GlobalStream?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/TumblingTimeWindowStrategy.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.time.Duration;
+
+/** A {@link WindowStrategy} used to generate tumbling TimeWindow. */
+@Experimental
+public class TumblingTimeWindowStrategy extends WindowStrategy {
+    private Duration windowSize;
+    private TimeType timeType;
+    private Duration allowedLateness;

Review Comment:
   Can be final.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/** The WindowStrategy defines how to generate Window in the stream. */
+@Experimental
+public class WindowStrategy implements Serializable {
+
+    public static final TimeType PROCESSING_TIME = TimeType.PROCESSING;
+    public static final TimeType EVENT_TIME = TimeType.EVENT;
+
+    /** The types of time used in window operations. */
+    @Experimental
+    public enum TimeType {
+        PROCESSING,
+        EVENT
+    }
+
+    // ============== global window ================
+
+    /**
+     * Creates a global window strategy. Note that the global window can be 
used in both
+     * GlobalStream, KeyedStream, NonKeyedStream.

Review Comment:
   We should not assume that the user is clear about the window assigning, 
triggering & cleaning timing for each type of window.
   
   For each type of window, could you elaborate more details in java doc?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/** The WindowStrategy defines how to generate Window in the stream. */
+@Experimental
+public class WindowStrategy implements Serializable {
+
+    public static final TimeType PROCESSING_TIME = TimeType.PROCESSING;
+    public static final TimeType EVENT_TIME = TimeType.EVENT;
+
+    /** The types of time used in window operations. */
+    @Experimental
+    public enum TimeType {
+        PROCESSING,
+        EVENT
+    }
+
+    // ============== global window ================
+
+    /**
+     * Creates a global window strategy. Note that the global window can be 
used in both
+     * GlobalStream, KeyedStream, NonKeyedStream.
+     *
+     * @return A global window strategy.
+     */
+    public static WindowStrategy global() {
+        return new GlobalWindowStrategy();
+    }
+
+    // ============== tumbling time window ================
+
+    /**
+     * Create a tumbling time window strategy with the event time default time 
type. Note that
+     * tumbling time windows can only be used in KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize) {
+        return new TumblingTimeWindowStrategy(windowSize);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize, TimeType 
timeType) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @param allowedLateness the allowed lateness of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(
+            Duration windowSize, TimeType timeType, Duration allowedLateness) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType, 
allowedLateness);
+    }
+
+    // ============== sliding time window ================
+
+    /**
+     * Create a sliding time window strategy with the event time default time 
type. Note that
+     * sliding time windows can only be used in KeyedStream.

Review Comment:
   How about GlobalStream?



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/WindowStrategy.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/** The WindowStrategy defines how to generate Window in the stream. */
+@Experimental
+public class WindowStrategy implements Serializable {
+
+    public static final TimeType PROCESSING_TIME = TimeType.PROCESSING;
+    public static final TimeType EVENT_TIME = TimeType.EVENT;
+
+    /** The types of time used in window operations. */
+    @Experimental
+    public enum TimeType {
+        PROCESSING,
+        EVENT
+    }
+
+    // ============== global window ================
+
+    /**
+     * Creates a global window strategy. Note that the global window can be 
used in both
+     * GlobalStream, KeyedStream, NonKeyedStream.
+     *
+     * @return A global window strategy.
+     */
+    public static WindowStrategy global() {
+        return new GlobalWindowStrategy();
+    }
+
+    // ============== tumbling time window ================
+
+    /**
+     * Create a tumbling time window strategy with the event time default time 
type. Note that
+     * tumbling time windows can only be used in KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize) {
+        return new TumblingTimeWindowStrategy(windowSize);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(Duration windowSize, TimeType 
timeType) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType);
+    }
+
+    /**
+     * Create a tumbling time window strategy. Note that tumbling time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param timeType the time type of Window.
+     * @param allowedLateness the allowed lateness of Window.
+     * @return A tumbling time window strategy.
+     */
+    public static WindowStrategy tumbling(
+            Duration windowSize, TimeType timeType, Duration allowedLateness) {
+        return new TumblingTimeWindowStrategy(windowSize, timeType, 
allowedLateness);
+    }
+
+    // ============== sliding time window ================
+
+    /**
+     * Create a sliding time window strategy with the event time default time 
type. Note that
+     * sliding time windows can only be used in KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param windowSlideInterval the slide interval of Window.
+     * @return A sliding time window strategy.
+     */
+    public static WindowStrategy sliding(Duration windowSize, Duration 
windowSlideInterval) {
+        return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval);
+    }
+
+    /**
+     * Create a sliding time window strategy. Note that sliding time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param windowSlideInterval the slide interval of Window.
+     * @param timeType the time type of Window.
+     * @return A sliding time window strategy.
+     */
+    public static WindowStrategy sliding(
+            Duration windowSize, Duration windowSlideInterval, TimeType 
timeType) {
+        return new SlidingTimeWindowStrategy(windowSize, windowSlideInterval, 
timeType);
+    }
+
+    /**
+     * Create a sliding time window strategy. Note that sliding time windows 
can only be used in
+     * KeyedStream.
+     *
+     * @param windowSize the size of Window.
+     * @param windowSlideInterval the slide interval of Window.
+     * @param timeType the time type of Window.
+     * @param allowedLateness the allowed lateness of Window.
+     * @return A sliding time window strategy.
+     */
+    public static WindowStrategy sliding(
+            Duration windowSize,
+            Duration windowSlideInterval,
+            TimeType timeType,
+            Duration allowedLateness) {
+        return new SlidingTimeWindowStrategy(
+                windowSize, windowSlideInterval, timeType, allowedLateness);
+    }
+
+    // ============== session window ================
+
+    /**
+     * Create a session time window strategy with the event time default time 
type. Note that
+     * session time windows can only be used in KeyedStream.
+     *
+     * @param sessionGap the timeout of session.
+     * @return A session window strategy.
+     */
+    public static WindowStrategy session(Duration sessionGap) {
+        return new SessionWindowStrategy(sessionGap);
+    }
+
+    /**
+     * Create a session time window strategy. Note that session time windows 
can only be used in
+     * KeyedStream.

Review Comment:
   How about GlobalStream?



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/utils/WindowUtils.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.window.utils;
+
+import 
org.apache.flink.datastream.api.extension.window.strategy.GlobalWindowStrategy;
+import 
org.apache.flink.datastream.api.extension.window.strategy.SessionWindowStrategy;
+import 
org.apache.flink.datastream.api.extension.window.strategy.SlidingTimeWindowStrategy;
+import 
org.apache.flink.datastream.api.extension.window.strategy.TumblingTimeWindowStrategy;
+import 
org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy;
+import 
org.apache.flink.datastream.impl.extension.window.context.WindowTriggerContext;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/** Some utils used in Window extension. */

Review Comment:
   ```suggestion
   /** Utilities for Window extension. */
   ```



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/SessionWindowStrategy.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.time.Duration;
+
+/** A {@link WindowStrategy} used to generate Session Windows. */
+@Experimental
+public class SessionWindowStrategy extends WindowStrategy {
+    private Duration sessionGap;
+    private TimeType timeType;

Review Comment:
   Can be final.



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/extension/window/strategy/TumblingTimeWindowStrategy.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.api.extension.window.strategy;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.time.Duration;
+
+/** A {@link WindowStrategy} used to generate tumbling TimeWindow. */

Review Comment:
   This is public api, we should provide more details about the tumbling window 
mechanism. Same for other strategy.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/context/WindowStateStore.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.window.context;
+
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.datastream.api.extension.window.context.WindowContext;
+import 
org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
+import org.apache.flink.runtime.state.v2.adaptor.ListStateAdaptor;
+import org.apache.flink.runtime.state.v2.adaptor.MapStateAdaptor;
+import org.apache.flink.runtime.state.v2.adaptor.ValueStateAdaptor;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class provides methods to store and retrieve state associated with 
windows in {@link
+ * WindowContext}.
+ *
+ * @param <K> Type of the window key.
+ * @param <W> Type of the window.
+ */
+public class WindowStateStore<K, W extends Window> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(WindowStateStore.class);
+
+    /** User-defined {@link WindowProcessFunction}. */
+    private final WindowProcessFunction windowProcessFunction;
+
+    /**
+     * The {@link StateDeclaration}s that have been declared by the {@link
+     * WindowProcessFunction#useWindowStates()}.
+     */
+    private final Set<StateDeclaration> declaredWindowStateDeclaration;
+
+    /** The operator to which the window belongs, used for creating and 
retrieving window state. */
+    private final AbstractAsyncStateStreamOperator<?> operator;
+
+    private final TypeSerializer<W> windowSerializer;
+
+    /** Whether the window is a merging window. */
+    private final boolean isMergingWindow;
+
+    public WindowStateStore(
+            WindowProcessFunction windowProcessFunction,
+            AbstractAsyncStateStreamOperator<?> operator,
+            TypeSerializer<W> windowSerializer,
+            boolean isMergingWindow) {
+        this.windowProcessFunction = windowProcessFunction;
+        this.declaredWindowStateDeclaration = 
windowProcessFunction.useWindowStates();
+        this.operator = operator;
+        this.windowSerializer = windowSerializer;
+        this.isMergingWindow = isMergingWindow;
+    }
+
+    private boolean isStateDeclared(StateDeclaration stateDeclaration) {
+        if (!declaredWindowStateDeclaration.contains(stateDeclaration)) {
+            LOG.warn(
+                    "Fail to get window state for "
+                            + stateDeclaration.getName()
+                            + ", please declare the used state in the 
`WindowProcessFunction#useWindowStates` method first.");
+            return false;
+        }
+        return true;
+    }
+
+    private boolean stateRedistributionModeIsNone(StateDeclaration 
stateDeclaration) {
+        StateDeclaration.RedistributionMode redistributionMode =
+                stateDeclaration.getRedistributionMode();
+        return redistributionMode == StateDeclaration.RedistributionMode.NONE;
+    }
+
+    /** Retrieve window state of list type. */
+    public <T> Optional<ListState<T>> getWindowState(
+            ListStateDeclaration<T> stateDeclaration, W namespace) {
+        checkState(
+                !isMergingWindow,
+                "Retrieving the window state is not permitted when using 
merging windows, such as session windows.");
+
+        if (!isStateDeclared(stateDeclaration)) {
+            return Optional.empty();
+        }
+
+        if (!stateRedistributionModeIsNone(stateDeclaration)) {
+            throw new UnsupportedOperationException(
+                    "RedistributionMode "
+                            + stateDeclaration.getRedistributionMode().name()
+                            + " is not supported for window state.");
+        }
+
+        ListStateDescriptor<T> stateDescriptor =
+                new ListStateDescriptor<T>(
+                        stateDeclaration.getName(),
+                        TypeExtractor.createTypeInfo(
+                                
stateDeclaration.getTypeDescriptor().getTypeClass()));
+
+        try {
+            ListStateAdaptor<K, W, T> state =
+                    operator.getOrCreateKeyedState(namespace, 
windowSerializer, stateDescriptor);
+            state.setCurrentNamespace(namespace);
+            return Optional.of(state);
+        } catch (Exception e) {
+            return Optional.empty();

Review Comment:
   Exception should be thrown to outside. Otherwise, the user cannot 
distinguish between empty state and illegal access. Same for other 
getWindowState impls.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/context/WindowStateStore.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.window.context;
+
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.datastream.api.extension.window.context.WindowContext;
+import 
org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
+import org.apache.flink.runtime.state.v2.adaptor.ListStateAdaptor;
+import org.apache.flink.runtime.state.v2.adaptor.MapStateAdaptor;
+import org.apache.flink.runtime.state.v2.adaptor.ValueStateAdaptor;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class provides methods to store and retrieve state associated with 
windows in {@link
+ * WindowContext}.
+ *
+ * @param <K> Type of the window key.
+ * @param <W> Type of the window.
+ */
+public class WindowStateStore<K, W extends Window> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(WindowStateStore.class);
+
+    /** User-defined {@link WindowProcessFunction}. */
+    private final WindowProcessFunction windowProcessFunction;
+
+    /**
+     * The {@link StateDeclaration}s that have been declared by the {@link
+     * WindowProcessFunction#useWindowStates()}.
+     */
+    private final Set<StateDeclaration> declaredWindowStateDeclaration;
+
+    /** The operator to which the window belongs, used for creating and 
retrieving window state. */
+    private final AbstractAsyncStateStreamOperator<?> operator;
+
+    private final TypeSerializer<W> windowSerializer;
+
+    /** Whether the window is a merging window. */
+    private final boolean isMergingWindow;
+
+    public WindowStateStore(
+            WindowProcessFunction windowProcessFunction,
+            AbstractAsyncStateStreamOperator<?> operator,
+            TypeSerializer<W> windowSerializer,
+            boolean isMergingWindow) {
+        this.windowProcessFunction = windowProcessFunction;
+        this.declaredWindowStateDeclaration = 
windowProcessFunction.useWindowStates();
+        this.operator = operator;
+        this.windowSerializer = windowSerializer;
+        this.isMergingWindow = isMergingWindow;
+    }
+
+    private boolean isStateDeclared(StateDeclaration stateDeclaration) {
+        if (!declaredWindowStateDeclaration.contains(stateDeclaration)) {
+            LOG.warn(
+                    "Fail to get window state for "
+                            + stateDeclaration.getName()
+                            + ", please declare the used state in the 
`WindowProcessFunction#useWindowStates` method first.");
+            return false;
+        }
+        return true;
+    }
+
+    private boolean stateRedistributionModeIsNone(StateDeclaration 
stateDeclaration) {
+        StateDeclaration.RedistributionMode redistributionMode =
+                stateDeclaration.getRedistributionMode();
+        return redistributionMode == StateDeclaration.RedistributionMode.NONE;
+    }

Review Comment:
   ```suggestion
       private boolean stateRedistributionModeIsNotNone(StateDeclaration 
stateDeclaration) {
           StateDeclaration.RedistributionMode redistributionMode =
                   stateDeclaration.getRedistributionMode();
           return redistributionMode != 
StateDeclaration.RedistributionMode.NONE;
       }
   ```



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/context/WindowStateStore.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.window.context;
+
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.MapStateDeclaration;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.MapState;
+import org.apache.flink.api.common.state.v2.MapStateDescriptor;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.datastream.api.extension.window.context.WindowContext;
+import 
org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperator;
+import org.apache.flink.runtime.state.v2.adaptor.ListStateAdaptor;
+import org.apache.flink.runtime.state.v2.adaptor.MapStateAdaptor;
+import org.apache.flink.runtime.state.v2.adaptor.ValueStateAdaptor;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This class provides methods to store and retrieve state associated with 
windows in {@link
+ * WindowContext}.
+ *
+ * @param <K> Type of the window key.
+ * @param <W> Type of the window.
+ */
+public class WindowStateStore<K, W extends Window> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(WindowStateStore.class);
+
+    /** User-defined {@link WindowProcessFunction}. */
+    private final WindowProcessFunction windowProcessFunction;
+
+    /**
+     * The {@link StateDeclaration}s that have been declared by the {@link
+     * WindowProcessFunction#useWindowStates()}.
+     */
+    private final Set<StateDeclaration> declaredWindowStateDeclaration;
+
+    /** The operator to which the window belongs, used for creating and 
retrieving window state. */
+    private final AbstractAsyncStateStreamOperator<?> operator;
+
+    private final TypeSerializer<W> windowSerializer;
+
+    /** Whether the window is a merging window. */
+    private final boolean isMergingWindow;
+
+    public WindowStateStore(
+            WindowProcessFunction windowProcessFunction,
+            AbstractAsyncStateStreamOperator<?> operator,
+            TypeSerializer<W> windowSerializer,
+            boolean isMergingWindow) {
+        this.windowProcessFunction = windowProcessFunction;
+        this.declaredWindowStateDeclaration = 
windowProcessFunction.useWindowStates();
+        this.operator = operator;
+        this.windowSerializer = windowSerializer;
+        this.isMergingWindow = isMergingWindow;
+    }
+
+    private boolean isStateDeclared(StateDeclaration stateDeclaration) {
+        if (!declaredWindowStateDeclaration.contains(stateDeclaration)) {
+            LOG.warn(
+                    "Fail to get window state for "
+                            + stateDeclaration.getName()
+                            + ", please declare the used state in the 
`WindowProcessFunction#useWindowStates` method first.");
+            return false;
+        }
+        return true;
+    }
+
+    private boolean stateRedistributionModeIsNone(StateDeclaration 
stateDeclaration) {
+        StateDeclaration.RedistributionMode redistributionMode =
+                stateDeclaration.getRedistributionMode();
+        return redistributionMode == StateDeclaration.RedistributionMode.NONE;
+    }
+
+    /** Retrieve window state of list type. */
+    public <T> Optional<ListState<T>> getWindowState(
+            ListStateDeclaration<T> stateDeclaration, W namespace) {
+        checkState(
+                !isMergingWindow,
+                "Retrieving the window state is not permitted when using 
merging windows, such as session windows.");
+
+        if (!isStateDeclared(stateDeclaration)) {
+            return Optional.empty();
+        }
+
+        if (!stateRedistributionModeIsNone(stateDeclaration)) {
+            throw new UnsupportedOperationException(
+                    "RedistributionMode "
+                            + stateDeclaration.getRedistributionMode().name()
+                            + " is not supported for window state.");
+        }
+
+        ListStateDescriptor<T> stateDescriptor =
+                new ListStateDescriptor<T>(
+                        stateDeclaration.getName(),
+                        TypeExtractor.createTypeInfo(
+                                
stateDeclaration.getTypeDescriptor().getTypeClass()));
+
+        try {
+            ListStateAdaptor<K, W, T> state =
+                    operator.getOrCreateKeyedState(namespace, 
windowSerializer, stateDescriptor);
+            state.setCurrentNamespace(namespace);
+            return Optional.of(state);
+        } catch (Exception e) {
+            return Optional.empty();
+        }
+    }
+
+    /** Retrieve window state of map type. */
+    public <KEY, V> Optional<MapState<KEY, V>> getWindowState(
+            MapStateDeclaration<KEY, V> stateDeclaration, W namespace) {
+        checkState(
+                !isMergingWindow,
+                "Retrieving the window state is not permitted when using 
merging windows, such as session windows.");
+
+        if (!isStateDeclared(stateDeclaration)) {
+            return Optional.empty();
+        }
+
+        if (!stateRedistributionModeIsNone(stateDeclaration)) {
+            throw new UnsupportedOperationException(
+                    "RedistributionMode "
+                            + stateDeclaration.getRedistributionMode().name()
+                            + " is not supported for window state.");
+        }
+
+        MapStateDescriptor<KEY, V> stateDescriptor =
+                new MapStateDescriptor<KEY, V>(
+                        stateDeclaration.getName(),
+                        TypeExtractor.createTypeInfo(
+                                
stateDeclaration.getKeyTypeDescriptor().getTypeClass()),
+                        TypeExtractor.createTypeInfo(
+                                
stateDeclaration.getValueTypeDescriptor().getTypeClass()));
+
+        try {
+            MapStateAdaptor<K, W, KEY, V> state =
+                    operator.getOrCreateKeyedState(namespace, 
windowSerializer, stateDescriptor);
+            state.setCurrentNamespace(namespace);
+            return Optional.of(state);
+        } catch (Exception e) {
+            return Optional.empty();
+        }
+    }
+
+    /** Retrieve window state of value type. */
+    public <T> Optional<ValueState<T>> getWindowState(
+            ValueStateDeclaration<T> stateDeclaration, W namespace) {
+        checkState(
+                !isMergingWindow,
+                "Retrieving the window state is not permitted when using 
merging windows, such as session windows.");
+
+        if (!isStateDeclared(stateDeclaration)) {
+            return Optional.empty();
+        }
+
+        if (!stateRedistributionModeIsNone(stateDeclaration)) {
+            throw new UnsupportedOperationException(
+                    "RedistributionMode "
+                            + stateDeclaration.getRedistributionMode().name()
+                            + " is not supported for window state.");
+        }
+
+        ValueStateDescriptor<T> stateDescriptor =
+                new ValueStateDescriptor<T>(
+                        stateDeclaration.getName(),
+                        TypeExtractor.createTypeInfo(
+                                
stateDeclaration.getTypeDescriptor().getTypeClass()));
+
+        try {
+            ValueStateAdaptor<K, W, T> state =
+                    operator.getOrCreateKeyedState(namespace, 
windowSerializer, stateDescriptor);
+            state.setCurrentNamespace(namespace);
+            return Optional.of(state);
+        } catch (Exception e) {
+            return Optional.empty();
+        }
+    }

Review Comment:
   Miss Reduce and Agg state.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java:
##########
@@ -118,16 +130,50 @@ public <OUT> 
ProcessConfigurableAndNonKeyedPartitionStream<OUT> process(
         TypeInformation<OUT> outType;
         outType = 
StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType());
 
-        KeyedProcessOperator<K, V, OUT> operator = new 
KeyedProcessOperator<>(processFunction);
-        Transformation<OUT> transform =
-                StreamUtils.getOneInputKeyedTransformation(
-                        "KeyedProcess", this, outType, operator, keySelector, 
keyType);
+        Transformation<OUT> transform;
+        Configuration configuration = getEnvironment().getConfiguration();
+
+        if (processFunction instanceof 
InternalOneInputWindowStreamProcessFunction) {
+            transform = transformWindow(outType, processFunction);
+        } else {
+            KeyedProcessOperator<K, V, OUT> operator = new 
KeyedProcessOperator<>(processFunction);
+            transform =
+                    StreamUtils.getOneInputKeyedTransformation(
+                            "KeyedProcess", this, outType, operator, 
keySelector, keyType);
+        }
         
transform.setAttribute(AttributeParser.parseAttribute(processFunction));
         environment.addOperator(transform);
         return StreamUtils.wrapWithConfigureHandle(
                 new NonKeyedPartitionStreamImpl<>(environment, transform));
     }
 
+    public <OUT> Transformation<OUT> transformWindow(
+            TypeInformation<OUT> outType, OneInputStreamProcessFunction<V, 
OUT> processFunction) {
+        Transformation<OUT> transform;
+        if (processFunction instanceof 
InternalOneInputWindowStreamProcessFunction) {
+            InternalOneInputWindowStreamProcessFunction<V, OUT, ?> 
internalWindowFunction =
+                    (InternalOneInputWindowStreamProcessFunction<V, OUT, ?>) 
processFunction;
+            WindowAssigner<V, ?> assigner = 
internalWindowFunction.getAssigner();
+            ListStateDescriptor<V> stateDesc =

Review Comment:
   This `ListStateDescriptor` is deprecated. I think the one we needed is 
`org.apache.flink.api.common.state.v2.ListStateDescriptor` instead of 
`org.apache.flink.runtime.state.v2.ListStateDescriptor`. Please double check 
this and also check for all other type of descriptors.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/utils/WindowUtils.java:
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.window.utils;
+
+import 
org.apache.flink.datastream.api.extension.window.strategy.GlobalWindowStrategy;
+import 
org.apache.flink.datastream.api.extension.window.strategy.SessionWindowStrategy;
+import 
org.apache.flink.datastream.api.extension.window.strategy.SlidingTimeWindowStrategy;
+import 
org.apache.flink.datastream.api.extension.window.strategy.TumblingTimeWindowStrategy;
+import 
org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy;
+import 
org.apache.flink.datastream.impl.extension.window.context.WindowTriggerContext;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/** Some utils used in Window extension. */
+public class WindowUtils {

Review Comment:
   ```suggestion
   public final class WindowUtils {
   ```



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/function/InternalOneInputWindowStreamProcessFunction.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.window.function;
+
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import 
org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction;
+import 
org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import java.util.Set;
+
+/**
+ * A class that wrap a {@link OneInputWindowStreamProcessFunction} to internal 
processing.

Review Comment:
   ```suggestion
    * A class that wrap a {@link OneInputWindowStreamProcessFunction} to 
process function. This will be translated to a window operator instead of 
vanilla process operator.  
   ```
   Please also updated for other variants of 
`InternalXXXWindowStreamProcessFunction`.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/utils/TaggedUnion.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.window.utils;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Objects;
+
+/** Util class for implementing two input window operation. */
+@Internal
+public class TaggedUnion<T1, T2> {

Review Comment:
   Will the data that goes into the two input window really be converted to 
`TaggedUnion`? What is our purpose in introducing this class.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java:
##########
@@ -73,9 +80,27 @@ public <OUT> 
ProcessConfigurableAndNonKeyedPartitionStream<OUT> process(
 
         TypeInformation<OUT> outType =
                 
StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType());
-        ProcessOperator<T, OUT> operator = new 
ProcessOperator<>(processFunction);
-        OneInputTransformation<T, OUT> outputTransform =
-                StreamUtils.getOneInputTransformation("Process", this, 
outType, operator);
+        OneInputTransformation<T, OUT> outputTransform;

Review Comment:
   I think for methods which doesn't support window, for example:
   
   ```public <T_OTHER, OUT> ProcessConfigurableAndNonKeyedPartitionStream<OUT> 
connectAndProcess(
               BroadcastStream<T_OTHER> other,
               TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> 
processFunction)
   ```
   
   we should check the `TwoInputBroadcastStreamProcessFunction` is not 
`InternalTwoInputNonBroadcastWindowStreamProcessFunction`, otherwise it may 
produce unexpected behavior as it is a valid 
`TwoInputNonBroadcastStreamProcessFunction` but have dummy `processRecord` impl.
   
   Please also check this for other stream.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java:
##########
@@ -118,16 +130,50 @@ public <OUT> 
ProcessConfigurableAndNonKeyedPartitionStream<OUT> process(
         TypeInformation<OUT> outType;
         outType = 
StreamUtils.getOutputTypeForOneInputProcessFunction(processFunction, getType());
 
-        KeyedProcessOperator<K, V, OUT> operator = new 
KeyedProcessOperator<>(processFunction);
-        Transformation<OUT> transform =
-                StreamUtils.getOneInputKeyedTransformation(
-                        "KeyedProcess", this, outType, operator, keySelector, 
keyType);
+        Transformation<OUT> transform;
+        Configuration configuration = getEnvironment().getConfiguration();

Review Comment:
   ?



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/extension/window/operators/OneInputWindowProcessOperator.java:
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.extension.window.operators;
+
+import org.apache.flink.api.common.state.v2.AppendingState;
+import org.apache.flink.api.common.state.v2.ListStateDescriptor;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
+import org.apache.flink.api.common.state.v2.StateIterator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import 
org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction;
+import 
org.apache.flink.datastream.api.extension.window.function.WindowProcessFunction;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+import 
org.apache.flink.datastream.impl.extension.window.context.DefaultOneInputWindowContext;
+import 
org.apache.flink.datastream.impl.extension.window.context.WindowTriggerContext;
+import 
org.apache.flink.datastream.impl.extension.window.function.InternalOneInputWindowStreamProcessFunction;
+import org.apache.flink.datastream.impl.extension.window.utils.WindowUtils;
+import org.apache.flink.datastream.impl.operators.BaseKeyedProcessOperator;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.v2.internal.InternalAppendingState;
+import org.apache.flink.runtime.state.v2.internal.InternalListState;
+import org.apache.flink.runtime.state.v2.internal.InternalMergingState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Operator for {@link OneInputWindowStreamProcessFunction} in {@link 
KeyedPartitionStream}. */
+public class OneInputWindowProcessOperator<K, IN, OUT, W extends Window>

Review Comment:
   I suggest that we abstracting common logic from all `XXXWindowOperator` to 
reuse code. Considering that DS V2 currently in the experimental stage and code 
freeze time is approaching. It's fair enough to do these refactor in the next 
release. 
   
   Further, we can also do this for V1's window operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to