This is an automated email from the ASF dual-hosted git repository.

shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new be9f4f6cc6 [GLUTEN-10546][FLINK] Support all flink operators for 
nexmark (#10548)
be9f4f6cc6 is described below

commit be9f4f6cc63aa7c6423b1a3c789740edabc0d09b
Author: shuai.xu <[email protected]>
AuthorDate: Thu Sep 4 17:17:02 2025 +0800

    [GLUTEN-10546][FLINK] Support all flink operators for nexmark (#10548)
    
    * [GLUTEN-10546][FLINK] Support all flink operators for nexmark
---
 .github/workflows/flink.yml                        |   2 +-
 gluten-flink/docs/Flink.md                         |   2 +-
 ...upAggregate.java => StreamExecDeduplicate.java} | 203 +++++++++---------
 .../stream/StreamExecGlobalWindowAggregate.java    |   4 +-
 .../exec/stream/StreamExecGroupAggregate.java      |  26 ++-
 ...te.java => StreamExecGroupWindowAggregate.java} | 234 ++++++++-------------
 .../stream/StreamExecLocalWindowAggregate.java     |   3 +-
 .../plan/nodes/exec/stream/StreamExecRank.java     |  63 +++++-
 ...gregate.java => StreamExecWindowAggregate.java} |  90 +++-----
 .../org/apache/gluten/rexnode/WindowUtils.java     |   4 +
 .../rexnode/functions/BaseRexCallConverters.java   |   4 +
 .../rexnode/functions/RexCallConverterFactory.java |   8 +-
 .../operators/GlutenVectorOneInputOperator.java    |  58 ++++-
 .../operators/GlutenVectorSourceFunction.java      |  69 +++++-
 .../operators/GlutenVectorTwoInputOperator.java    |  66 ++++--
 .../apache/gluten/util/LogicalTypeConverter.java   |   7 +-
 16 files changed, 471 insertions(+), 372 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index ed579c0729..6c929887ce 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -43,7 +43,7 @@ jobs:
           source /opt/rh/gcc-toolset-11/enable
           sudo dnf install -y patchelf
           git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
-          cd velox4j && git reset --hard 
a5e3e9d7f11440f8c4eafeff88ae6945186d02c1
+          cd velox4j && git reset --hard 
1cdeb1a8384967499919e655d55a66f2daa9d55c
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
           cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index b1fefc93e5..bfff2bee9b 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you 
have to use the follow
 ## fetch velox4j code
 git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
 cd velox4j
-git reset --hard a5e3e9d7f11440f8c4eafeff88ae6945186d02c1
+git reset --hard 1cdeb1a8384967499919e655d55a66f2daa9d55c
 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
 ```
 **Get gluten**
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
similarity index 54%
copy from 
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
copy to 
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
index 9319bbc846..ed89b71774 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
@@ -16,21 +16,20 @@
  */
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
-import org.apache.gluten.rexnode.AggregateCallConverter;
-import org.apache.gluten.rexnode.Utils;
 import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
 import org.apache.gluten.util.LogicalTypeConverter;
 import org.apache.gluten.util.PlanNodeIdGenerator;
 
-import io.github.zhztheplayer.velox4j.aggregate.Aggregate;
-import io.github.zhztheplayer.velox4j.aggregate.AggregateStep;
-import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
-import io.github.zhztheplayer.velox4j.plan.AggregationNode;
+import io.github.zhztheplayer.velox4j.plan.DeduplicateNode;
 import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.StreamRankNode;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -39,125 +38,123 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
 import org.apache.flink.table.types.logical.RowType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import org.apache.calcite.rel.core.AggregateCall;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_DEDUPLICATE_INSERT_UPDATE_AFTER_SENSITIVE_ENABLED;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Stream {@link ExecNode} for unbounded group aggregate.
- *
- * <p>This node does support un-splittable aggregate function (e.g. 
STDDEV_POP).
+ * Stream {@link ExecNode} which deduplicate on keys and keeps only first row 
or last row. This node
+ * is an optimization of {@link StreamExecRank} for some special cases. 
Compared to {@link
+ * StreamExecRank}, this node could use mini-batch and access less state.
  */
 @ExecNodeMetadata(
-    name = "stream-exec-group-aggregate",
+    name = "stream-exec-deduplicate",
     version = 1,
-    consumedOptions = {"table.exec.mini-batch.enabled", 
"table.exec.mini-batch.size"},
-    producedTransformations = 
StreamExecGroupAggregate.GROUP_AGGREGATE_TRANSFORMATION,
+    consumedOptions = {
+      "table.exec.mini-batch.enabled",
+      "table.exec.mini-batch.size",
+      "table.exec.deduplicate.insert-update-after-sensitive-enabled",
+      "table.exec.deduplicate.mini-batch.compact-changes-enabled"
+    },
+    producedTransformations = StreamExecDeduplicate.DEDUPLICATE_TRANSFORMATION,
     minPlanVersion = FlinkVersion.v1_15,
     minStateVersion = FlinkVersion.v1_15)
-public class StreamExecGroupAggregate extends StreamExecAggregateBase {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(StreamExecGroupAggregate.class);
+public class StreamExecDeduplicate extends ExecNodeBase<RowData>
+    implements StreamExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
 
-  public static final String GROUP_AGGREGATE_TRANSFORMATION = 
"group-aggregate";
+  public static final String DEDUPLICATE_TRANSFORMATION = "deduplicate";
 
-  public static final String STATE_NAME = "groupAggregateState";
+  public static final String FIELD_NAME_UNIQUE_KEYS = "uniqueKeys";
+  public static final String FIELD_NAME_IS_ROWTIME = "isRowtime";
+  public static final String FIELD_NAME_KEEP_LAST_ROW = "keepLastRow";
+  public static final String FIELD_NAME_GENERATE_UPDATE_BEFORE = 
"generateUpdateBefore";
+  public static final String STATE_NAME = "deduplicateState";
 
-  @JsonProperty(FIELD_NAME_GROUPING)
-  private final int[] grouping;
+  @JsonProperty(FIELD_NAME_UNIQUE_KEYS)
+  private final int[] uniqueKeys;
 
-  @JsonProperty(FIELD_NAME_AGG_CALLS)
-  private final AggregateCall[] aggCalls;
+  @JsonProperty(FIELD_NAME_IS_ROWTIME)
+  private final boolean isRowtime;
 
-  /** Each element indicates whether the corresponding agg call needs 
`retract` method. */
-  @JsonProperty(FIELD_NAME_AGG_CALL_NEED_RETRACTIONS)
-  private final boolean[] aggCallNeedRetractions;
+  @JsonProperty(FIELD_NAME_KEEP_LAST_ROW)
+  private final boolean keepLastRow;
 
-  /** Whether this node will generate UPDATE_BEFORE messages. */
   @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE)
   private final boolean generateUpdateBefore;
 
-  /** Whether this node consumes retraction messages. */
-  @JsonProperty(FIELD_NAME_NEED_RETRACTION)
-  private final boolean needRetraction;
-
   @Nullable
   @JsonProperty(FIELD_NAME_STATE)
   @JsonInclude(JsonInclude.Include.NON_NULL)
   private final List<StateMetadata> stateMetadataList;
 
-  public StreamExecGroupAggregate(
+  public StreamExecDeduplicate(
       ReadableConfig tableConfig,
-      int[] grouping,
-      AggregateCall[] aggCalls,
-      boolean[] aggCallNeedRetractions,
+      int[] uniqueKeys,
+      boolean isRowtime,
+      boolean keepLastRow,
       boolean generateUpdateBefore,
-      boolean needRetraction,
-      @Nullable Long stateTtlFromHint,
       InputProperty inputProperty,
       RowType outputType,
       String description) {
     this(
         ExecNodeContext.newNodeId(),
-        ExecNodeContext.newContext(StreamExecGroupAggregate.class),
-        ExecNodeContext.newPersistedConfig(StreamExecGroupAggregate.class, 
tableConfig),
-        grouping,
-        aggCalls,
-        aggCallNeedRetractions,
+        ExecNodeContext.newContext(StreamExecDeduplicate.class),
+        ExecNodeContext.newPersistedConfig(StreamExecDeduplicate.class, 
tableConfig),
+        uniqueKeys,
+        isRowtime,
+        keepLastRow,
         generateUpdateBefore,
-        needRetraction,
-        StateMetadata.getOneInputOperatorDefaultMeta(stateTtlFromHint, 
tableConfig, STATE_NAME),
+        StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, STATE_NAME),
         Collections.singletonList(inputProperty),
         outputType,
         description);
   }
 
   @JsonCreator
-  public StreamExecGroupAggregate(
+  public StreamExecDeduplicate(
       @JsonProperty(FIELD_NAME_ID) int id,
       @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
       @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
-      @JsonProperty(FIELD_NAME_GROUPING) int[] grouping,
-      @JsonProperty(FIELD_NAME_AGG_CALLS) AggregateCall[] aggCalls,
-      @JsonProperty(FIELD_NAME_AGG_CALL_NEED_RETRACTIONS) boolean[] 
aggCallNeedRetractions,
+      @JsonProperty(FIELD_NAME_UNIQUE_KEYS) int[] uniqueKeys,
+      @JsonProperty(FIELD_NAME_IS_ROWTIME) boolean isRowtime,
+      @JsonProperty(FIELD_NAME_KEEP_LAST_ROW) boolean keepLastRow,
       @JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean 
generateUpdateBefore,
-      @JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction,
       @Nullable @JsonProperty(FIELD_NAME_STATE) List<StateMetadata> 
stateMetadataList,
       @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
       @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
       @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
     super(id, context, persistedConfig, inputProperties, outputType, 
description);
-    this.grouping = checkNotNull(grouping);
-    this.aggCalls = checkNotNull(aggCalls);
-    this.aggCallNeedRetractions = checkNotNull(aggCallNeedRetractions);
-    checkArgument(aggCalls.length == aggCallNeedRetractions.length);
+    checkArgument(inputProperties.size() == 1);
+    this.uniqueKeys = checkNotNull(uniqueKeys);
+    this.isRowtime = isRowtime;
+    this.keepLastRow = keepLastRow;
     this.generateUpdateBefore = generateUpdateBefore;
-    this.needRetraction = needRetraction;
     this.stateMetadataList = stateMetadataList;
   }
 
@@ -165,72 +162,78 @@ public class StreamExecGroupAggregate extends 
StreamExecAggregateBase {
   @Override
   protected Transformation<RowData> translateToPlanInternal(
       PlannerBase planner, ExecNodeConfig config) {
-
-    final long stateRetentionTime =
-        StateMetadata.getStateTtlForOneInputOperator(config, 
stateMetadataList);
-    if (grouping.length > 0 && stateRetentionTime < 0) {
-      LOG.warn(
-          "No state retention interval configured for a query which 
accumulates state. "
-              + "Please provide a query configuration with valid retention 
interval to prevent excessive "
-              + "state size. You may specify a retention time of 0 to not 
clean up the state.");
-    }
-
     final ExecEdge inputEdge = getInputEdges().get(0);
     final Transformation<RowData> inputTransform =
         (Transformation<RowData>) inputEdge.translateToPlan(planner);
+
     final RowType inputRowType = (RowType) inputEdge.getOutputType();
+    final InternalTypeInfo<RowData> rowTypeInfo =
+        (InternalTypeInfo<RowData>) inputTransform.getOutputType();
+    final TypeSerializer<RowData> rowSerializer =
+        
rowTypeInfo.createSerializer(planner.getExecEnv().getConfig().getSerializerConfig());
+    final OneInputStreamOperator operator;
+
+    long stateRetentionTime =
+        StateMetadata.getStateTtlForOneInputOperator(config, 
stateMetadataList);
 
     // --- Begin Gluten-specific code changes ---
     io.github.zhztheplayer.velox4j.type.RowType inputType =
         (io.github.zhztheplayer.velox4j.type.RowType) 
LogicalTypeConverter.toVLType(inputRowType);
-    List<FieldAccessTypedExpr> groupingKeys = 
Utils.generateFieldAccesses(inputType, grouping);
-    List<Aggregate> aggregates = AggregateCallConverter.toAggregates(aggCalls, 
inputType);
     io.github.zhztheplayer.velox4j.type.RowType outputType =
         (io.github.zhztheplayer.velox4j.type.RowType)
             LogicalTypeConverter.toVLType(getOutputType());
-    checkArgument(outputType.getNames().size() == grouping.length + 
aggCalls.length);
-    List<String> aggNames =
-        outputType.getNames().stream()
-            .skip(grouping.length)
-            .limit(aggCalls.length)
-            .collect(Collectors.toList());
-    // TODO: velox agg may not equal to flink
-    PlanNode aggregation =
-        new AggregationNode(
-            PlanNodeIdGenerator.newId(),
-            AggregateStep.SINGLE,
-            groupingKeys,
-            groupingKeys,
-            aggNames,
-            aggregates,
-            false,
-            List.of(new EmptyNode(inputType)),
-            null,
-            List.of());
-    final OneInputStreamOperator operator =
-        new GlutenVectorOneInputOperator(
-            new StatefulPlanNode(aggregation.getId(), aggregation),
-            PlanNodeIdGenerator.newId(),
-            inputType,
-            Map.of(aggregation.getId(), outputType));
+    if (isRowtime) {
+      int rowtimeIndex = -1;
+      for (int i = 0; i < inputRowType.getFieldCount(); ++i) {
+        if (TypeCheckUtils.isRowTime(inputRowType.getTypeAt(i))) {
+          rowtimeIndex = i;
+          break;
+        }
+      }
+      boolean generateInsert =
+          
config.get(TABLE_EXEC_DEDUPLICATE_INSERT_UPDATE_AFTER_SENSITIVE_ENABLED);
+      PlanNode deduplicateNode =
+          new DeduplicateNode(
+              PlanNodeIdGenerator.newId(),
+              List.of(new EmptyNode(inputType)), // sources
+              outputType,
+              stateRetentionTime,
+              rowtimeIndex,
+              generateUpdateBefore,
+              generateInsert,
+              keepLastRow);
+      List<Integer> keyIndexes = 
Arrays.stream(uniqueKeys).boxed().collect(Collectors.toList());
+      PartitionFunctionSpec keySelectorSpec = new 
HashPartitionFunctionSpec(inputType, keyIndexes);
+      final PlanNode streamRankNode =
+          new StreamRankNode(
+              PlanNodeIdGenerator.newId(),
+              List.of(new EmptyNode(inputType)), // sources
+              keySelectorSpec,
+              deduplicateNode,
+              outputType);
+      operator =
+          new GlutenVectorOneInputOperator(
+              new StatefulPlanNode(streamRankNode.getId(), streamRankNode),
+              PlanNodeIdGenerator.newId(),
+              inputType,
+              Map.of(streamRankNode.getId(), outputType));
+    } else {
+      throw new RuntimeException("ProcTime in deduplicate is not supported.");
+    }
     // --- End Gluten-specific code changes ---
 
-    // partitioned aggregation
     final OneInputTransformation<RowData, RowData> transform =
         ExecNodeUtil.createOneInputTransformation(
             inputTransform,
-            createTransformationMeta(GROUP_AGGREGATE_TRANSFORMATION, config),
+            createTransformationMeta(DEDUPLICATE_TRANSFORMATION, config),
             operator,
-            InternalTypeInfo.of(getOutputType()),
+            rowTypeInfo,
             inputTransform.getParallelism(),
             false);
 
-    // set KeyType and Selector for state
     final RowDataKeySelector selector =
         KeySelectorUtil.getRowDataSelector(
-            planner.getFlinkContext().getClassLoader(),
-            grouping,
-            InternalTypeInfo.of(inputRowType));
+            planner.getFlinkContext().getClassLoader(), uniqueKeys, 
rowTypeInfo);
     transform.setStateKeySelector(selector);
     transform.setStateKeyType(selector.getProducedType());
 
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
index 6c12febf40..ee8c91a375 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
@@ -184,6 +184,7 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
     final ZoneId shiftTimeZone =
         TimeWindowUtil.getShiftTimeZone(
             windowing.getTimeAttributeType(), 
TableConfigUtils.getLocalTimeZone(config));
+
     // --- Begin Gluten-specific code changes ---
     // TODO: velox window not equal to flink window.
     io.github.zhztheplayer.velox4j.type.RowType inputType =
@@ -250,7 +251,8 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
             slide,
             offset,
             windowType,
-            outputType);
+            outputType,
+            rowtimeIndex);
     final OneInputStreamOperator windowOperator =
         new GlutenVectorOneInputOperator(
             new StatefulPlanNode(windowAgg.getId(), windowAgg),
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
index 9319bbc846..6758111254 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
@@ -23,10 +23,11 @@ import org.apache.gluten.util.LogicalTypeConverter;
 import org.apache.gluten.util.PlanNodeIdGenerator;
 
 import io.github.zhztheplayer.velox4j.aggregate.Aggregate;
-import io.github.zhztheplayer.velox4j.aggregate.AggregateStep;
 import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
-import io.github.zhztheplayer.velox4j.plan.AggregationNode;
-import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.GroupAggregationNode;
+import io.github.zhztheplayer.velox4j.plan.GroupAggsHandlerNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
 
@@ -60,6 +61,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -194,19 +196,15 @@ public class StreamExecGroupAggregate extends 
StreamExecAggregateBase {
             .skip(grouping.length)
             .limit(aggCalls.length)
             .collect(Collectors.toList());
+    List<Integer> keyIndexes = 
Arrays.stream(grouping).boxed().collect(Collectors.toList());
+    PartitionFunctionSpec keySelectorSpec = new 
HashPartitionFunctionSpec(inputType, keyIndexes);
+    PlanNode aggsHandlerNode =
+        new GroupAggsHandlerNode(
+            PlanNodeIdGenerator.newId(), outputType, generateUpdateBefore, 
needRetraction);
     // TODO: velox agg may not equal to flink
     PlanNode aggregation =
-        new AggregationNode(
-            PlanNodeIdGenerator.newId(),
-            AggregateStep.SINGLE,
-            groupingKeys,
-            groupingKeys,
-            aggNames,
-            aggregates,
-            false,
-            List.of(new EmptyNode(inputType)),
-            null,
-            List.of());
+        new GroupAggregationNode(
+            PlanNodeIdGenerator.newId(), aggsHandlerNode, keySelectorSpec, 
outputType);
     final OneInputStreamOperator operator =
         new GlutenVectorOneInputOperator(
             new StatefulPlanNode(aggregation.getId(), aggregation),
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
similarity index 57%
copy from 
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
copy to 
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
index 6c12febf40..a2ce1425ea 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
@@ -18,35 +18,32 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.stream;
 
 import org.apache.gluten.rexnode.AggregateCallConverter;
 import org.apache.gluten.rexnode.Utils;
-import org.apache.gluten.rexnode.WindowUtils;
 import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
 import org.apache.gluten.util.LogicalTypeConverter;
 import org.apache.gluten.util.PlanNodeIdGenerator;
 
 import io.github.zhztheplayer.velox4j.aggregate.Aggregate;
-import io.github.zhztheplayer.velox4j.aggregate.AggregateStep;
 import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
-import io.github.zhztheplayer.velox4j.plan.AggregationNode;
-import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.GroupWindowAggregationNode;
+import io.github.zhztheplayer.velox4j.plan.GroupWindowAggsHandlerNode;
 import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.StreamWindowAggregationNode;
 import io.github.zhztheplayer.velox4j.plan.StreamWindowPartitionFunctionSpec;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
-import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
@@ -54,55 +51,59 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.TableConfigUtils;
-import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy;
 import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
-import org.apache.flink.table.runtime.groupwindow.WindowProperty;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
-import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.runtime.util.TimeWindowUtil;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.tools.RelBuilder;
-import org.apache.commons.math3.util.ArithmeticUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
-import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.TimeZone;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.planner.plan.utils.AggregateUtil.hasRowIntervalType;
+import static 
org.apache.flink.table.planner.plan.utils.AggregateUtil.isRowtimeAttribute;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** Stream {@link ExecNode} for window table-valued based global aggregate. */
+/**
+ * Stream {@link ExecNode} for either group window aggregate or group window 
table aggregate.
+ *
+ * <p>The differences between {@link StreamExecWindowAggregate} and {@link
+ * StreamExecGroupWindowAggregate} is that, this node is translated from 
window TVF syntax, but the
+ * * other is from the legacy GROUP WINDOW FUNCTION syntax. In the long 
future, {@link
+ * StreamExecGroupWindowAggregate} will be dropped.
+ */
 @ExecNodeMetadata(
-    name = "stream-exec-global-window-aggregate",
+    name = "stream-exec-group-window-aggregate",
     version = 1,
-    consumedOptions = "table.local-time-zone",
-    producedTransformations =
-        StreamExecGlobalWindowAggregate.GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION,
+    consumedOptions = {
+      "table.local-time-zone",
+      "table.exec.mini-batch.enabled",
+      "table.exec.mini-batch.size"
+    },
+    producedTransformations = 
StreamExecGroupWindowAggregate.GROUP_WINDOW_AGGREGATE_TRANSFORMATION,
     minPlanVersion = FlinkVersion.v1_15,
     minStateVersion = FlinkVersion.v1_15)
-public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBase {
+public class StreamExecGroupWindowAggregate extends StreamExecAggregateBase {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(StreamExecGroupWindowAggregate.class);
 
-  public static final String GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION = 
"global-window-aggregate";
+  public static final String GROUP_WINDOW_AGGREGATE_TRANSFORMATION = 
"group-window-aggregate";
 
-  public static final String FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE = 
"localAggInputRowType";
+  public static final String FIELD_NAME_WINDOW = "window";
+  public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES = 
"namedWindowProperties";
 
   @JsonProperty(FIELD_NAME_GROUPING)
   private final int[] grouping;
@@ -110,80 +111,101 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
   @JsonProperty(FIELD_NAME_AGG_CALLS)
   private final AggregateCall[] aggCalls;
 
-  @JsonProperty(FIELD_NAME_WINDOWING)
-  private final WindowingStrategy windowing;
+  @JsonProperty(FIELD_NAME_WINDOW)
+  private final LogicalWindow window;
 
   @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
   private final NamedWindowProperty[] namedWindowProperties;
 
-  /** The input row type of this node's local agg. */
-  @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE)
-  private final RowType localAggInputRowType;
-
   @JsonProperty(FIELD_NAME_NEED_RETRACTION)
   private final boolean needRetraction;
 
-  public StreamExecGlobalWindowAggregate(
+  public StreamExecGroupWindowAggregate(
       ReadableConfig tableConfig,
       int[] grouping,
       AggregateCall[] aggCalls,
-      WindowingStrategy windowing,
+      LogicalWindow window,
       NamedWindowProperty[] namedWindowProperties,
-      Boolean needRetraction,
+      boolean needRetraction,
       InputProperty inputProperty,
-      RowType localAggInputRowType,
       RowType outputType,
       String description) {
     this(
         ExecNodeContext.newNodeId(),
-        ExecNodeContext.newContext(StreamExecGlobalWindowAggregate.class),
-        
ExecNodeContext.newPersistedConfig(StreamExecGlobalWindowAggregate.class, 
tableConfig),
+        ExecNodeContext.newContext(StreamExecGroupWindowAggregate.class),
+        
ExecNodeContext.newPersistedConfig(StreamExecGroupWindowAggregate.class, 
tableConfig),
         grouping,
         aggCalls,
-        windowing,
+        window,
         namedWindowProperties,
         needRetraction,
         Collections.singletonList(inputProperty),
-        localAggInputRowType,
         outputType,
         description);
   }
 
   @JsonCreator
-  public StreamExecGlobalWindowAggregate(
+  public StreamExecGroupWindowAggregate(
       @JsonProperty(FIELD_NAME_ID) int id,
       @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
       @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
       @JsonProperty(FIELD_NAME_GROUPING) int[] grouping,
       @JsonProperty(FIELD_NAME_AGG_CALLS) AggregateCall[] aggCalls,
-      @JsonProperty(FIELD_NAME_WINDOWING) WindowingStrategy windowing,
+      @JsonProperty(FIELD_NAME_WINDOW) LogicalWindow window,
       @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES) NamedWindowProperty[] 
namedWindowProperties,
-      @Nullable @JsonProperty(FIELD_NAME_NEED_RETRACTION) Boolean 
needRetraction,
+      @JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction,
       @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
-      @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE) RowType 
localAggInputRowType,
       @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
       @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
     super(id, context, persistedConfig, inputProperties, outputType, 
description);
+    checkArgument(inputProperties.size() == 1);
     this.grouping = checkNotNull(grouping);
     this.aggCalls = checkNotNull(aggCalls);
-    this.windowing = checkNotNull(windowing);
+    this.window = checkNotNull(window);
     this.namedWindowProperties = checkNotNull(namedWindowProperties);
-    this.localAggInputRowType = localAggInputRowType;
-    this.needRetraction = Optional.ofNullable(needRetraction).orElse(false);
+    this.needRetraction = needRetraction;
   }
 
   @SuppressWarnings("unchecked")
   @Override
   protected Transformation<RowData> translateToPlanInternal(
       PlannerBase planner, ExecNodeConfig config) {
+    final boolean isCountWindow;
+    if (window instanceof TumblingGroupWindow) {
+      isCountWindow = hasRowIntervalType(((TumblingGroupWindow) 
window).size());
+    } else if (window instanceof SlidingGroupWindow) {
+      isCountWindow = hasRowIntervalType(((SlidingGroupWindow) window).size());
+    } else {
+      isCountWindow = false;
+    }
+
+    if (isCountWindow && grouping.length > 0 && config.getStateRetentionTime() 
< 0) {
+      LOGGER.warn(
+          "No state retention interval configured for a query which 
accumulates state. "
+              + "Please provide a query configuration with valid retention 
interval to prevent "
+              + "excessive state size. You may specify a retention time of 0 
to not clean up the state.");
+    }
+
     final ExecEdge inputEdge = getInputEdges().get(0);
     final Transformation<RowData> inputTransform =
         (Transformation<RowData>) inputEdge.translateToPlan(planner);
     final RowType inputRowType = (RowType) inputEdge.getOutputType();
 
-    final ZoneId shiftTimeZone =
-        TimeWindowUtil.getShiftTimeZone(
-            windowing.getTimeAttributeType(), 
TableConfigUtils.getLocalTimeZone(config));
+    final int inputTimeFieldIndex;
+    if (isRowtimeAttribute(window.timeAttribute())) {
+      inputTimeFieldIndex = window.timeAttribute().getFieldIndex();
+      if (inputTimeFieldIndex < 0) {
+        throw new TableException(
+            "Group window must defined on a time attribute, "
+                + "but the time attribute can't be found.\n"
+                + "This should never happen. Please file an issue.");
+      }
+    } else {
+      inputTimeFieldIndex = -1;
+    }
+
+    WindowEmitStrategy emitStrategy = WindowEmitStrategy.apply(config, window);
+
     // --- Begin Gluten-specific code changes ---
     // TODO: velox window not equal to flink window.
     io.github.zhztheplayer.velox4j.type.RowType inputType =
@@ -202,54 +224,20 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
     List<Integer> keyIndexes = 
Arrays.stream(grouping).boxed().collect(Collectors.toList());
     PartitionFunctionSpec keySelectorSpec = new 
HashPartitionFunctionSpec(inputType, keyIndexes);
     // TODO: support more window types.
-    Tuple5<Long, Long, Long, Integer, Integer> windowSpecParams =
-        WindowUtils.extractWindowParameters(windowing);
-    long size = windowSpecParams.f0;
-    long slide = windowSpecParams.f1;
-    long offset = windowSpecParams.f2;
-    int rowtimeIndex = windowSpecParams.f3;
-    int windowType = windowSpecParams.f4;
     PartitionFunctionSpec sliceAssignerSpec =
-        new StreamWindowPartitionFunctionSpec(
-            inputType, rowtimeIndex, size, slide, offset, windowType);
-    PlanNode aggregation =
-        new AggregationNode(
-            PlanNodeIdGenerator.newId(),
-            AggregateStep.SINGLE,
-            groupingKeys,
-            groupingKeys,
-            aggNames,
-            aggregates,
-            false,
-            List.of(new EmptyNode(inputType)),
-            null,
-            List.of());
-    PlanNode localAgg =
-        new AggregationNode(
-            PlanNodeIdGenerator.newId(),
-            AggregateStep.SINGLE,
-            groupingKeys,
-            groupingKeys,
-            aggNames,
-            aggregates,
-            false,
-            List.of(new EmptyNode(inputType)),
-            null,
-            List.of());
+        new StreamWindowPartitionFunctionSpec(inputType, inputTimeFieldIndex, 
0L, 0L, 0L, 1);
+    PlanNode aggregation = new 
GroupWindowAggsHandlerNode(PlanNodeIdGenerator.newId(), outputType);
     PlanNode windowAgg =
-        new StreamWindowAggregationNode(
+        new GroupWindowAggregationNode(
             PlanNodeIdGenerator.newId(),
             aggregation,
-            localAgg,
             keySelectorSpec,
             sliceAssignerSpec,
-            ArithmeticUtils.gcd(size, slide),
-            TimeZone.getTimeZone(shiftTimeZone).useDaylightTime(),
-            false,
-            size,
-            slide,
-            offset,
-            windowType,
+            emitStrategy.getAllowLateness(),
+            emitStrategy.produceUpdates(),
+            inputTimeFieldIndex,
+            true, // TODO: get from window attributes
+            1, // TODO: get from window attributes
             outputType);
     final OneInputStreamOperator windowOperator =
         new GlutenVectorOneInputOperator(
@@ -259,61 +247,23 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
             Map.of(windowAgg.getId(), outputType));
     // --- End Gluten-specific code changes ---
 
-    final RowDataKeySelector selector =
-        KeySelectorUtil.getRowDataSelector(
-            planner.getFlinkContext().getClassLoader(),
-            grouping,
-            InternalTypeInfo.of(inputRowType));
-
     final OneInputTransformation<RowData, RowData> transform =
         ExecNodeUtil.createOneInputTransformation(
             inputTransform,
-            createTransformationMeta(GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION, 
config),
+            createTransformationMeta(GROUP_WINDOW_AGGREGATE_TRANSFORMATION, 
config),
             SimpleOperatorFactory.of(windowOperator),
             InternalTypeInfo.of(getOutputType()),
             inputTransform.getParallelism(),
-            WINDOW_AGG_MEMORY_RATIO,
             false);
 
     // set KeyType and Selector for state
+    final RowDataKeySelector selector =
+        KeySelectorUtil.getRowDataSelector(
+            planner.getFlinkContext().getClassLoader(),
+            grouping,
+            InternalTypeInfo.of(inputRowType));
     transform.setStateKeySelector(selector);
     transform.setStateKeyType(selector.getProducedType());
     return transform;
   }
-
-  private GeneratedNamespaceAggsHandleFunction<Long> createAggsHandler(
-      String name,
-      SliceAssigner sliceAssigner,
-      AggregateInfoList aggInfoList,
-      int mergedAccOffset,
-      boolean mergedAccIsOnHeap,
-      DataType[] mergedAccExternalTypes,
-      ExecNodeConfig config,
-      ClassLoader classLoader,
-      RelBuilder relBuilder,
-      ZoneId shifTimeZone) {
-    final AggsHandlerCodeGenerator generator =
-        new AggsHandlerCodeGenerator(
-                new CodeGeneratorContext(config, classLoader),
-                relBuilder,
-                
JavaScalaConversionUtil.toScala(localAggInputRowType.getChildren()),
-                true) // copyInputField
-            .needAccumulate()
-            .needMerge(mergedAccOffset, mergedAccIsOnHeap, 
mergedAccExternalTypes);
-
-    final List<WindowProperty> windowProperties =
-        Arrays.asList(
-            Arrays.stream(namedWindowProperties)
-                .map(NamedWindowProperty::getProperty)
-                .toArray(WindowProperty[]::new));
-
-    return generator.generateNamespaceAggsHandler(
-        name,
-        aggInfoList,
-        JavaScalaConversionUtil.toScala(windowProperties),
-        sliceAssigner,
-        // we use window end timestamp to indicate a slicing window, see 
SliceAssigner
-        Long.class,
-        shifTimeZone);
-  }
 }
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
index 30c80e8c7c..4ffcf7998c 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
@@ -219,7 +219,8 @@ public class StreamExecLocalWindowAggregate extends 
StreamExecWindowAggregateBas
             slide,
             offset,
             windowType,
-            outputType);
+            outputType,
+            rowtimeIndex);
     final OneInputStreamOperator localAggOperator =
         new GlutenVectorOneInputOperator(
             new StatefulPlanNode(windowAgg.getId(), windowAgg),
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
index 6099f3dfb8..c1b87e8f60 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
@@ -23,8 +23,13 @@ import org.apache.gluten.util.PlanNodeIdGenerator;
 
 import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
 import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
 import io.github.zhztheplayer.velox4j.plan.PlanNode;
 import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.StreamRankNode;
+import io.github.zhztheplayer.velox4j.plan.StreamTopNNode;
+import io.github.zhztheplayer.velox4j.plan.TopNNode;
 import io.github.zhztheplayer.velox4j.plan.TopNRowNumberNode;
 import io.github.zhztheplayer.velox4j.sort.SortOrder;
 
@@ -64,10 +69,13 @@ import javax.annotation.Nullable;
 import javax.swing.*;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RANK_TOPN_CACHE_SIZE;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -200,6 +208,7 @@ public class StreamExecRank extends ExecNodeBase<RowData>
     RowType inputRowType = (RowType) inputEdge.getOutputType();
     InternalTypeInfo<RowData> inputRowTypeInfo = 
InternalTypeInfo.of(inputRowType);
     int[] sortFields = sortSpec.getFieldIndices();
+    long cacheSize = config.get(TABLE_EXEC_RANK_TOPN_CACHE_SIZE);
 
     // --- Begin Gluten-specific code changes ---
     io.github.zhztheplayer.velox4j.type.RowType inputType =
@@ -215,21 +224,55 @@ public class StreamExecRank extends ExecNodeBase<RowData>
             LogicalTypeConverter.toVLType(getOutputType());
     // TODO: velox RowNumber may not equal to flink
     int limit = 1;
-    final PlanNode rowNumberNode =
-        new TopNRowNumberNode(
+    final PlanNode topNNode;
+    if (outputRankNumber) {
+      topNNode =
+          new TopNRowNumberNode(
+              PlanNodeIdGenerator.newId(),
+              partitionKeys,
+              sortKeys,
+              sortOrders,
+              null,
+              limit,
+              List.of(new EmptyNode(inputType)));
+    } else {
+      topNNode =
+          new TopNNode(
+              PlanNodeIdGenerator.newId(),
+              sortKeys,
+              sortOrders,
+              limit,
+              false,
+              List.of(new EmptyNode(inputType)));
+    }
+    List<Integer> keyIndexes = 
Arrays.stream(partitionFields).boxed().collect(Collectors.toList());
+    PartitionFunctionSpec keySelectorSpec = new 
HashPartitionFunctionSpec(inputType, keyIndexes);
+    List<Integer> sortKeyIndexes = 
Arrays.stream(sortFields).boxed().collect(Collectors.toList());
+    PartitionFunctionSpec sortKeySelectorSpec =
+        new HashPartitionFunctionSpec(inputType, sortKeyIndexes);
+    final PlanNode streamTopNNode =
+        new StreamTopNNode(
+            PlanNodeIdGenerator.newId(),
+            List.of(new EmptyNode(inputType)), // sources
+            topNNode,
+            sortKeySelectorSpec,
+            outputType,
+            generateUpdateBefore,
+            outputRankNumber,
+            cacheSize);
+    final PlanNode streamRankNode =
+        new StreamRankNode(
             PlanNodeIdGenerator.newId(),
-            partitionKeys,
-            sortKeys,
-            sortOrders,
-            null,
-            limit,
-            List.of(new EmptyNode(inputType)));
+            List.of(new EmptyNode(inputType)), // sources
+            keySelectorSpec,
+            streamTopNNode,
+            outputType);
     final OneInputStreamOperator operator =
         new GlutenVectorOneInputOperator(
-            new StatefulPlanNode(rowNumberNode.getId(), rowNumberNode),
+            new StatefulPlanNode(streamRankNode.getId(), streamRankNode),
             PlanNodeIdGenerator.newId(),
             inputType,
-            Map.of(rowNumberNode.getId(), outputType));
+            Map.of(streamRankNode.getId(), outputType));
     // --- End Gluten-specific code changes ---
 
     OneInputTransformation<RowData, RowData> transform =
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
similarity index 77%
copy from 
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
copy to 
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
index 6c12febf40..afe5d01483 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
@@ -43,8 +43,6 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
-import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
@@ -54,25 +52,18 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.TableConfigUtils;
-import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
-import org.apache.flink.table.runtime.groupwindow.WindowProperty;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
-import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.util.TimeWindowUtil;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.tools.RelBuilder;
 import org.apache.commons.math3.util.ArithmeticUtils;
 
 import javax.annotation.Nullable;
@@ -89,20 +80,29 @@ import java.util.stream.Collectors;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/** Stream {@link ExecNode} for window table-valued based global aggregate. */
+/**
+ * Stream {@link ExecNode} for window table-valued based aggregate.
+ *
+ * <p>The differences between {@link StreamExecWindowAggregate} and {@link
+ * StreamExecGroupWindowAggregate} is that, this node is translated from 
window TVF syntax, but the
+ * other is from the legacy GROUP WINDOW FUNCTION syntax. In the long future, 
{@link
+ * StreamExecGroupWindowAggregate} will be dropped.
+ */
 @ExecNodeMetadata(
-    name = "stream-exec-global-window-aggregate",
+    name = "stream-exec-window-aggregate",
     version = 1,
     consumedOptions = "table.local-time-zone",
-    producedTransformations =
-        StreamExecGlobalWindowAggregate.GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION,
+    producedTransformations = 
StreamExecWindowAggregate.WINDOW_AGGREGATE_TRANSFORMATION,
     minPlanVersion = FlinkVersion.v1_15,
     minStateVersion = FlinkVersion.v1_15)
-public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBase {
+public class StreamExecWindowAggregate extends StreamExecWindowAggregateBase {
+
+  public static final String WINDOW_AGGREGATE_TRANSFORMATION = 
"window-aggregate";
 
-  public static final String GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION = 
"global-window-aggregate";
+  private static final long WINDOW_AGG_MEMORY_RATIO = 100;
 
-  public static final String FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE = 
"localAggInputRowType";
+  public static final String FIELD_NAME_WINDOWING = "windowing";
+  public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES = 
"namedWindowProperties";
 
   @JsonProperty(FIELD_NAME_GROUPING)
   private final int[] grouping;
@@ -116,14 +116,10 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
   @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
   private final NamedWindowProperty[] namedWindowProperties;
 
-  /** The input row type of this node's local agg. */
-  @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE)
-  private final RowType localAggInputRowType;
-
   @JsonProperty(FIELD_NAME_NEED_RETRACTION)
   private final boolean needRetraction;
 
-  public StreamExecGlobalWindowAggregate(
+  public StreamExecWindowAggregate(
       ReadableConfig tableConfig,
       int[] grouping,
       AggregateCall[] aggCalls,
@@ -131,26 +127,24 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
       NamedWindowProperty[] namedWindowProperties,
       Boolean needRetraction,
       InputProperty inputProperty,
-      RowType localAggInputRowType,
       RowType outputType,
       String description) {
     this(
         ExecNodeContext.newNodeId(),
-        ExecNodeContext.newContext(StreamExecGlobalWindowAggregate.class),
-        
ExecNodeContext.newPersistedConfig(StreamExecGlobalWindowAggregate.class, 
tableConfig),
+        ExecNodeContext.newContext(StreamExecWindowAggregate.class),
+        ExecNodeContext.newPersistedConfig(StreamExecWindowAggregate.class, 
tableConfig),
         grouping,
         aggCalls,
         windowing,
         namedWindowProperties,
         needRetraction,
         Collections.singletonList(inputProperty),
-        localAggInputRowType,
         outputType,
         description);
   }
 
   @JsonCreator
-  public StreamExecGlobalWindowAggregate(
+  public StreamExecWindowAggregate(
       @JsonProperty(FIELD_NAME_ID) int id,
       @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
       @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
@@ -160,7 +154,6 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
       @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES) NamedWindowProperty[] 
namedWindowProperties,
       @Nullable @JsonProperty(FIELD_NAME_NEED_RETRACTION) Boolean 
needRetraction,
       @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
-      @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE) RowType 
localAggInputRowType,
       @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
       @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
     super(id, context, persistedConfig, inputProperties, outputType, 
description);
@@ -168,7 +161,6 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
     this.aggCalls = checkNotNull(aggCalls);
     this.windowing = checkNotNull(windowing);
     this.namedWindowProperties = checkNotNull(namedWindowProperties);
-    this.localAggInputRowType = localAggInputRowType;
     this.needRetraction = Optional.ofNullable(needRetraction).orElse(false);
   }
 
@@ -184,6 +176,7 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
     final ZoneId shiftTimeZone =
         TimeWindowUtil.getShiftTimeZone(
             windowing.getTimeAttributeType(), 
TableConfigUtils.getLocalTimeZone(config));
+
     // --- Begin Gluten-specific code changes ---
     // TODO: velox window not equal to flink window.
     io.github.zhztheplayer.velox4j.type.RowType inputType =
@@ -250,7 +243,8 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
             slide,
             offset,
             windowType,
-            outputType);
+            outputType,
+            rowtimeIndex);
     final OneInputStreamOperator windowOperator =
         new GlutenVectorOneInputOperator(
             new StatefulPlanNode(windowAgg.getId(), windowAgg),
@@ -268,7 +262,7 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
     final OneInputTransformation<RowData, RowData> transform =
         ExecNodeUtil.createOneInputTransformation(
             inputTransform,
-            createTransformationMeta(GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION, 
config),
+            createTransformationMeta(WINDOW_AGGREGATE_TRANSFORMATION, config),
             SimpleOperatorFactory.of(windowOperator),
             InternalTypeInfo.of(getOutputType()),
             inputTransform.getParallelism(),
@@ -280,40 +274,4 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
     transform.setStateKeyType(selector.getProducedType());
     return transform;
   }
-
-  private GeneratedNamespaceAggsHandleFunction<Long> createAggsHandler(
-      String name,
-      SliceAssigner sliceAssigner,
-      AggregateInfoList aggInfoList,
-      int mergedAccOffset,
-      boolean mergedAccIsOnHeap,
-      DataType[] mergedAccExternalTypes,
-      ExecNodeConfig config,
-      ClassLoader classLoader,
-      RelBuilder relBuilder,
-      ZoneId shifTimeZone) {
-    final AggsHandlerCodeGenerator generator =
-        new AggsHandlerCodeGenerator(
-                new CodeGeneratorContext(config, classLoader),
-                relBuilder,
-                
JavaScalaConversionUtil.toScala(localAggInputRowType.getChildren()),
-                true) // copyInputField
-            .needAccumulate()
-            .needMerge(mergedAccOffset, mergedAccIsOnHeap, 
mergedAccExternalTypes);
-
-    final List<WindowProperty> windowProperties =
-        Arrays.asList(
-            Arrays.stream(namedWindowProperties)
-                .map(NamedWindowProperty::getProperty)
-                .toArray(WindowProperty[]::new));
-
-    return generator.generateNamespaceAggsHandler(
-        name,
-        aggInfoList,
-        JavaScalaConversionUtil.toScala(windowProperties),
-        sliceAssigner,
-        // we use window end timestamp to indicate a slicing window, see 
SliceAssigner
-        Long.class,
-        shifTimeZone);
-  }
 }
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
index 60b6f71fa9..29cd64c30b 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
@@ -59,6 +59,8 @@ public class WindowUtils {
       if (windowOffset != null) {
         offset = windowOffset.toMillis();
       }
+    } else {
+      throw new RuntimeException("Not support window spec " + windowSpec);
     }
 
     if (windowing instanceof TimeAttributeWindowingStrategy && 
windowing.isRowtime()) {
@@ -69,6 +71,8 @@ public class WindowUtils {
       windowType = 1;
     } else if (windowing instanceof SliceAttachedWindowingStrategy) {
       rowtimeIndex = ((SliceAttachedWindowingStrategy) 
windowing).getSliceEnd();
+    } else {
+      throw new RuntimeException("Not support window strategy " + windowing);
     }
     return new Tuple5<Long, Long, Long, Integer, Integer>(
         size, slide, offset, rowtimeIndex, windowType);
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
index fa2385aaf1..fdc1ac5370 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
@@ -23,6 +23,7 @@ import org.apache.gluten.rexnode.ValidationResult;
 
 import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
 import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
 import io.github.zhztheplayer.velox4j.type.TimestampType;
 import io.github.zhztheplayer.velox4j.type.Type;
 
@@ -80,6 +81,9 @@ class DefaultRexCallConverter extends BaseRexCallConverter {
       if (sourceType instanceof TimestampType && resultType instanceof 
TimestampType) {
         return sourceExpr;
       }
+    } else if ("unix_timestamp".equals(functionName)) {
+      // TODO: this is a trick here for q12, refine it.
+      resultType = new BigIntType();
     }
     return new CallTypedExpr(resultType, params, functionName);
   }
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
index 38e48cea9b..df7a6e8a90 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
@@ -91,7 +91,13 @@ public class RexCallConverterFactory {
                   () -> new 
BasicArithmeticOperatorRexCallConverter("lessthanorequal"),
                   () -> new StringCompareRexCallConverter("lessthanorequal"),
                   () -> new 
StringNumberCompareRexCallConverter("lessthanorequal"),
-                  () -> new 
TimestampIntervalRexCallConverter("lessthanorequal"))));
+                  () -> new 
TimestampIntervalRexCallConverter("lessthanorequal"))),
+          Map.entry("PROCTIME", Arrays.asList(() -> new 
DefaultRexCallConverter("unix_timestamp"))),
+          Map.entry("OR", Arrays.asList(() -> new 
DefaultRexCallConverter("or"))),
+          Map.entry("IS NOT NULL", Arrays.asList(() -> new 
DefaultRexCallConverter("is_not_null"))),
+          Map.entry(
+              "REGEXP_EXTRACT", Arrays.asList(() -> new 
DefaultRexCallConverter("regexp_extract"))),
+          Map.entry("LOWER", Arrays.asList(() -> new 
DefaultRexCallConverter("lower"))));
 
   public static RexCallConverter getConverter(RexCall callNode, 
RexConversionContext context) {
     String operatorName = callNode.getOperator().getName();
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
index fe4ed41966..5319c61b79 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
@@ -39,13 +39,13 @@ import 
io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
 import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark;
 import io.github.zhztheplayer.velox4j.type.RowType;
 
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.runtime.operators.TableStreamOperator;
 
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,7 +69,6 @@ public class GlutenVectorOneInputOperator extends 
TableStreamOperator<StatefulRe
   private Session session;
   private Query query;
   private ExternalStreams.BlockingQueue inputQueue;
-  private BufferAllocator allocator;
   private SerialTask task;
 
   public GlutenVectorOneInputOperator(
@@ -80,14 +79,9 @@ public class GlutenVectorOneInputOperator extends 
TableStreamOperator<StatefulRe
     this.outputTypes = outputTypes;
   }
 
-  @Override
-  public void open() throws Exception {
-    super.open();
-    outElement = new StreamRecord(null);
+  void initGlutenTask() {
     memoryManager = MemoryManager.create(AllocationListener.NOOP);
     session = Velox4j.newSession(memoryManager);
-
-    inputQueue = session.externalStreamOps().newBlockingQueue();
     // add a mock input as velox not allow the source is empty.
     StatefulPlanNode mockInput =
         new StatefulPlanNode(
@@ -101,8 +95,14 @@ public class GlutenVectorOneInputOperator extends 
TableStreamOperator<StatefulRe
     LOG.debug("Gluten Plan: {}", Serde.toJson(mockInput));
     LOG.debug("OutTypes: {}", outputTypes.keySet());
     query = new Query(mockInput, Config.empty(), ConnectorConfig.empty());
-    allocator = new RootAllocator(Long.MAX_VALUE);
     task = session.queryOps().execute(query);
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    outElement = new StreamRecord(null);
+    inputQueue = session.externalStreamOps().newBlockingQueue();
     ExternalStreamConnectorSplit split =
         new ExternalStreamConnectorSplit("connector-external-stream", 
inputQueue.id());
     task.addSplit(id, split);
@@ -138,7 +138,6 @@ public class GlutenVectorOneInputOperator extends 
TableStreamOperator<StatefulRe
     task.close();
     session.close();
     memoryManager.close();
-    allocator.close();
   }
 
   @Override
@@ -160,4 +159,41 @@ public class GlutenVectorOneInputOperator extends 
TableStreamOperator<StatefulRe
   public String getId() {
     return id;
   }
+
+  @Override
+  public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+    // TODO: notify velox
+    super.prepareSnapshotPreBarrier(checkpointId);
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    // TODO: implement it
+    task.snapshotState(0);
+    super.snapshotState(context);
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    if (task == null) {
+      initGlutenTask();
+    }
+    // TODO: implement it
+    task.initializeState(0);
+    super.initializeState(context);
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    // TODO: notify velox
+    task.notifyCheckpointComplete(checkpointId);
+    super.notifyCheckpointComplete(checkpointId);
+  }
+
+  @Override
+  public void notifyCheckpointAborted(long checkpointId) throws Exception {
+    // TODO: notify velox
+    task.notifyCheckpointAborted(checkpointId);
+    super.notifyCheckpointAborted(checkpointId);
+  }
 }
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
index 4224c0aaa6..1bbd8993ba 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
@@ -31,6 +31,11 @@ import io.github.zhztheplayer.velox4j.session.Session;
 import io.github.zhztheplayer.velox4j.stateful.StatefulElement;
 import io.github.zhztheplayer.velox4j.type.RowType;
 
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
@@ -45,7 +50,8 @@ import java.util.Map;
  * Gluten legacy source function, call velox plan to execute. It sends 
RowVector to downstream
  * instead of RowData to avoid data convert.
  */
-public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<StatefulElement> {
+public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<StatefulElement>
+    implements CheckpointedFunction, CheckpointListener {
   private static final Logger LOG = 
LoggerFactory.getLogger(GlutenVectorSourceFunction.class);
 
   private final StatefulPlanNode planNode;
@@ -56,8 +62,9 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
 
   private Session session;
   private Query query;
-  BufferAllocator allocator;
+  private BufferAllocator allocator;
   private MemoryManager memoryManager;
+  private SerialTask task;
 
   public GlutenVectorSourceFunction(
       StatefulPlanNode planNode,
@@ -87,17 +94,24 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
     return split;
   }
 
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    System.out.println("InitializeState GlutenSourceFunction");
+    if (memoryManager == null) {
+      memoryManager = MemoryManager.create(AllocationListener.NOOP);
+      session = Velox4j.newSession(memoryManager);
+      query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
+      allocator = new RootAllocator(Long.MAX_VALUE);
+
+      task = session.queryOps().execute(query);
+      task.addSplit(id, split);
+      task.noMoreSplits(id);
+    }
+  }
+
   @Override
   public void run(SourceContext<StatefulElement> sourceContext) throws 
Exception {
     LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode));
-    memoryManager = MemoryManager.create(AllocationListener.NOOP);
-    session = Velox4j.newSession(memoryManager);
-    query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
-    allocator = new RootAllocator(Long.MAX_VALUE);
-
-    SerialTask task = session.queryOps().execute(query);
-    task.addSplit(id, split);
-    task.noMoreSplits(id);
     while (isRunning) {
       UpIterator.State state = task.advance();
       if (state == UpIterator.State.AVAILABLE) {
@@ -126,4 +140,39 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
   public void cancel() {
     isRunning = false;
   }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext context) throws Exception {
+    // TODO: implement it
+    this.task.snapshotState(0);
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+    if (memoryManager == null) {
+      memoryManager = MemoryManager.create(AllocationListener.NOOP);
+      session = Velox4j.newSession(memoryManager);
+      query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
+      allocator = new RootAllocator(Long.MAX_VALUE);
+
+      task = session.queryOps().execute(query);
+      task.addSplit(id, split);
+      task.noMoreSplits(id);
+    }
+    System.out.println("InitializeState GlutenSourceFunction");
+    // TODO: implement it
+    this.task.initializeState(0);
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    // TODO: notify velox
+    this.task.notifyCheckpointComplete(checkpointId);
+  }
+
+  @Override
+  public void notifyCheckpointAborted(long checkpointId) throws Exception {
+    // TODO: notify velox
+    this.task.notifyCheckpointAborted(checkpointId);
+  }
 }
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
index d10fcfa05d..04ed555a3b 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
@@ -37,13 +37,13 @@ import 
io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
 import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark;
 import io.github.zhztheplayer.velox4j.type.RowType;
 
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +73,6 @@ public class GlutenVectorTwoInputOperator extends 
AbstractStreamOperator<Statefu
   private Query query;
   private ExternalStreams.BlockingQueue leftInputQueue;
   private ExternalStreams.BlockingQueue rightInputQueue;
-  private BufferAllocator allocator;
   private SerialTask task;
 
   public GlutenVectorTwoInputOperator(
@@ -91,21 +90,26 @@ public class GlutenVectorTwoInputOperator extends 
AbstractStreamOperator<Statefu
     this.outputTypes = outputTypes;
   }
 
+  // initializeState is called before open, so need to init gluten task first.
+  private void initGlutenTask() {
+    memoryManager = MemoryManager.create(AllocationListener.NOOP);
+    session = Velox4j.newSession(memoryManager);
+    query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty());
+    task = session.queryOps().execute(query);
+    LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan));
+    LOG.debug("OutTypes: {}", outputTypes.keySet());
+    LOG.debug("RuntimeContex: {}", getRuntimeContext().getClass().getName());
+  }
+
   @Override
   public void open() throws Exception {
     super.open();
+    if (task == null) {
+      initGlutenTask();
+    }
     outElement = new StreamRecord(null);
-    memoryManager = MemoryManager.create(AllocationListener.NOOP);
-    session = Velox4j.newSession(memoryManager);
-
     leftInputQueue = session.externalStreamOps().newBlockingQueue();
     rightInputQueue = session.externalStreamOps().newBlockingQueue();
-    LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan));
-    LOG.debug("OutTypes: {}", outputTypes.keySet());
-    LOG.debug("RuntimeContex: {}", getRuntimeContext().getClass().getName());
-    query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty());
-    allocator = new RootAllocator(Long.MAX_VALUE);
-    task = session.queryOps().execute(query);
     ExternalStreamConnectorSplit leftSplit =
         new ExternalStreamConnectorSplit("connector-external-stream", 
leftInputQueue.id());
     ExternalStreamConnectorSplit rightSplit =
@@ -172,7 +176,6 @@ public class GlutenVectorTwoInputOperator extends 
AbstractStreamOperator<Statefu
     task.close();
     session.close();
     memoryManager.close();
-    allocator.close();
   }
 
   @Override
@@ -210,4 +213,41 @@ public class GlutenVectorTwoInputOperator extends 
AbstractStreamOperator<Statefu
   public String getRightId() {
     return rightId;
   }
+
+  @Override
+  public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+    // TODO: notify velox
+    super.prepareSnapshotPreBarrier(checkpointId);
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    // TODO: implement it
+    task.snapshotState(0);
+    super.snapshotState(context);
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    if (task == null) {
+      initGlutenTask();
+    }
+    // TODO: implement it
+    task.initializeState(0);
+    super.initializeState(context);
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    // TODO: notify velox
+    task.notifyCheckpointComplete(checkpointId);
+    super.notifyCheckpointComplete(checkpointId);
+  }
+
+  @Override
+  public void notifyCheckpointAborted(long checkpointId) throws Exception {
+    // TODO: notify velox
+    task.notifyCheckpointAborted(checkpointId);
+    super.notifyCheckpointAborted(checkpointId);
+  }
 }
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
index 7db368ae14..d4e33a17a1 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.table.types.logical.DayTimeIntervalType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
@@ -103,7 +104,11 @@ public class LogicalTypeConverter {
                 Type keyType = toVLType(mapType.getKeyType());
                 Type valueType = toVLType(mapType.getValueType());
                 return 
io.github.zhztheplayer.velox4j.type.MapType.create(keyType, valueType);
-              }));
+              }),
+          // TODO: may need precision
+          Map.entry(
+              LocalZonedTimestampType.class,
+              logicalType -> new 
io.github.zhztheplayer.velox4j.type.TimestampType()));
 
   public static Type toVLType(LogicalType logicalType) {
     VLTypeConverter converter = converters.get(logicalType.getClass());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to