Zakelly commented on code in PR #25996:

@@ -0,0 +1,902 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.asyncprocessing.operators.windowing;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.SerializerFactory;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.v2.AppendingState;
+import org.apache.flink.api.common.state.v2.State;
+import org.apache.flink.api.common.state.v2.StateDescriptor;
+import org.apache.flink.api.common.state.v2.StateFuture;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
+import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.v2.internal.InternalAppendingState;
+import org.apache.flink.runtime.state.v2.internal.InternalListState;
+import org.apache.flink.runtime.state.v2.internal.InternalMergingState;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorAttributes;
+import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.OutputTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicReference;
+import static 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+ * An operator that implements the logic for windowing based on a {@link 
WindowAssigner} and {@link
+ * AsyncTrigger}. This is the async state access version of WindowOperator.
+ *
+ * <p>When an element arrives it gets assigned a key using a {@link 
KeySelector} and it gets
+ * assigned to zero or more windows using a {@link WindowAssigner}. Based on 
this, the element is
+ * put into panes. A pane is the bucket of elements that have the same key and 
same {@code Window}.
+ * An element can be in multiple panes if it was assigned to multiple windows 
by the {@code
+ * WindowAssigner}.
+ *
+ * <p>Each pane gets its own instance of the provided {@code Trigger}. This 
trigger determines when
+ * the contents of the pane should be processed to emit results. When a 
trigger fires, the given
+ * {@link InternalWindowFunction} is invoked to produce the results that are 
emitted for the pane to
+ * which the {@code Trigger} belongs.
+ *
+ * @param <K> The type of key returned by the {@code KeySelector}.
+ * @param <IN> The type of the incoming elements.
+ * @param <OUT> The type of elements emitted by the {@code 
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} 
+ */
+public class AsyncWindowOperator<K, IN, ACC, OUT, W extends Window>
+        extends AbstractAsyncStateUdfStreamOperator<
+                OUT, InternalAsyncWindowFunction<ACC, OUT, K, W>>
+        implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
+    // ------------------------------------------------------------------------
+    // Configuration values and user functions
+    // ------------------------------------------------------------------------
+    protected final WindowAssigner<? super IN, W> windowAssigner;
+    private final KeySelector<IN, K> keySelector;
+    private final AsyncTrigger<? super IN, ? super W> trigger;
+    private final StateDescriptor<?> windowStateDescriptor;
+    /** For serializing the key in checkpoints. */
+    protected final TypeSerializer<K> keySerializer;
+    /** For serializing the window in checkpoints. */
+    protected final TypeSerializer<W> windowSerializer;
+    /**
+     * The allowed lateness for elements. This is used for:
+     *
+     * <ul>
+     *   <li>Deciding if an element should be dropped from a window due to 
+     *   <li>Clearing the state of a window if the system time passes the 
{@code window.maxTimestamp
+     *       + allowedLateness} landmark.
+     * </ul>
+     */
+    protected final long allowedLateness;
+    /**
+     * {@link OutputTag} to use for late arriving events. Elements for which 
+     * window.maxTimestamp + allowedLateness} is smaller than the current 
watermark will be emitted
+     * to this.
+     */
+    protected final OutputTag<IN> lateDataOutputTag;
+    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = 
+    protected transient Counter numLateRecordsDropped;
+    // ------------------------------------------------------------------------
+    // State that is not checkpointed
+    // ------------------------------------------------------------------------
+    /** The state in which the window contents is stored. Each window is a 
namespace */
+    private transient InternalAppendingState<K, W, IN, ACC, ACC, ACC> 
+    /**
+     * The {@link #windowState}, typed to merging state for merging windows. 
Null if the window
+     * state is not mergeable. TODO: Not support currently.
+     */
+    private transient InternalMergingState<K, W, IN, ACC, ACC, ACC> 
+    /**
+     * The state that holds the merging window metadata (the sets that 
describe what is merged).
+     * TODO: Not support currently.
+     */
+    private transient InternalListState<K, VoidNamespace, Tuple2<W, W>> 
+    /**
+     * This is given to the {@code InternalWindowFunction} for emitting 
elements with a given
+     * timestamp.
+     */
+    protected transient TimestampedCollector<OUT> timestampedCollector;
+    protected transient Context triggerContext;
+    protected transient WindowContext processContext;
+    protected transient DeclaredVariable<W> windowDeclaredVariable;
+    protected transient DeclaredVariable<K> keyDeclaredVariable;
+    protected transient WindowAssigner.WindowAssignerContext 
+    // ------------------------------------------------------------------------
+    // State that needs to be checkpointed
+    // ------------------------------------------------------------------------
+    protected transient InternalTimerService<W> internalTimerService;
+    /** Creates a new {@code WindowOperator} based on the given policies and 
user functions. */
+    public AsyncWindowOperator(
+            WindowAssigner<? super IN, W> windowAssigner,
+            TypeSerializer<W> windowSerializer,
+            KeySelector<IN, K> keySelector,
+            TypeSerializer<K> keySerializer,
+            StateDescriptor<?> windowStateDescriptor,
+            InternalAsyncWindowFunction<ACC, OUT, K, W> windowFunction,
+            AsyncTrigger<? super IN, ? super W> trigger,
+            long allowedLateness,
+            OutputTag<IN> lateDataOutputTag) {
+        super(windowFunction);
+        checkArgument(allowedLateness >= 0);
+        checkArgument(
+                windowStateDescriptor == null || 
+                "window state serializer is not properly initialized");
+        this.windowAssigner = checkNotNull(windowAssigner);
+        this.windowSerializer = checkNotNull(windowSerializer);
+        this.keySelector = checkNotNull(keySelector);
+        this.keySerializer = checkNotNull(keySerializer);
+        this.windowStateDescriptor = windowStateDescriptor;
+        this.trigger = checkNotNull(trigger);
+        this.allowedLateness = allowedLateness;
+        this.lateDataOutputTag = lateDataOutputTag;
+"Initialize async window operator with trigger " + trigger);
+    }
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<OUT>> output) {
+        super.setup(containingTask, config, output);
+    }
+    @Override
+    public void setProcessingTimeService(ProcessingTimeService 
processingTimeService) {
+        super.setProcessingTimeService(processingTimeService);
+    }
+    @Override
+    public void open() throws Exception {
+        this.numLateRecordsDropped = 
+        timestampedCollector = new TimestampedCollector<>(output);
+        internalTimerService = getInternalTimerService("window-timers", 
windowSerializer, this);
+        keyDeclaredVariable =
+                declarationContext.declareVariable(
+                        keySerializer, "_AsyncWindowOperator$key", 
+        windowDeclaredVariable =
+                declarationContext.declareVariable(
+                        windowSerializer,
+                        "_AsyncWindowOperator$window",
+                        windowSerializer::createInstance);
+        triggerContext = new Context(keyDeclaredVariable, 
+        processContext = new WindowContext(windowDeclaredVariable);
+        windowAssignerContext =
+                new WindowAssigner.WindowAssignerContext() {
+                    @Override
+                    public long getCurrentProcessingTime() {
+                        return internalTimerService.currentProcessingTime();
+                    }
+                };
+        // create (or restore) the state that hold the actual window contents
+        // NOTE - the state may be null in the case of the overriding evicting 
window operator
+        if (windowStateDescriptor != null) {
+            windowState =
+                    stateWrapWithNamespace(
+                            getAsyncKeyedStateBackend()
+                                    .getOrCreateKeyedState(
+                                            windowSerializer.createInstance(),
+                                            windowSerializer,
+                                            windowStateDescriptor),
+                            windowDeclaredVariable);
+        }
+        // create the typed and helper states for merging windows
+        if (windowAssigner instanceof MergingWindowAssigner) {
+            throw new UnsupportedOperationException(
+                    "Async WindowOperator not support merging window (e.g. 
session window) yet.");
+        }
+    }
+    @Override
+    public void close() throws Exception {
+        super.close();
+        timestampedCollector = null;
+        triggerContext = null;
+        processContext = null;
+        windowAssignerContext = null;
+    }
+    @Override
+    public void processElement(StreamRecord<IN> element) throws Exception {
+        final Collection<W> elementWindows =
+                windowAssigner.assignWindows(
+                        element.getValue(), element.getTimestamp(), 
+        final K key = (K) getCurrentKey();
+        Collection<StateFuture<Boolean>> windowFutures = new ArrayList<>();
+        if (windowAssigner instanceof MergingWindowAssigner) {
+            throw new UnsupportedOperationException(
+                    "Async WindowOperator not support merging window (e.g. 
session window) yet.");
+        } else {
+            for (W window : elementWindows) {
+                // drop if the window is already late
+                if (isWindowLate(window)) {
+                    continue;

Review Comment:
   Maybe should add `StateFutureUtils.completedFuture(true);`

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail:

For queries about this service, please contact Infrastructure at:

Reply via email to