This is an automated email from the ASF dual-hosted git repository.
shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new be9f4f6cc6 [GLUTEN-10546][FLINK] Support all flink operators for
nexmark (#10548)
be9f4f6cc6 is described below
commit be9f4f6cc63aa7c6423b1a3c789740edabc0d09b
Author: shuai.xu <[email protected]>
AuthorDate: Thu Sep 4 17:17:02 2025 +0800
[GLUTEN-10546][FLINK] Support all flink operators for nexmark (#10548)
* [GLUTEN-10546][FLINK] Support all flink operators for nexmark
---
.github/workflows/flink.yml | 2 +-
gluten-flink/docs/Flink.md | 2 +-
...upAggregate.java => StreamExecDeduplicate.java} | 203 +++++++++---------
.../stream/StreamExecGlobalWindowAggregate.java | 4 +-
.../exec/stream/StreamExecGroupAggregate.java | 26 ++-
...te.java => StreamExecGroupWindowAggregate.java} | 234 ++++++++-------------
.../stream/StreamExecLocalWindowAggregate.java | 3 +-
.../plan/nodes/exec/stream/StreamExecRank.java | 63 +++++-
...gregate.java => StreamExecWindowAggregate.java} | 90 +++-----
.../org/apache/gluten/rexnode/WindowUtils.java | 4 +
.../rexnode/functions/BaseRexCallConverters.java | 4 +
.../rexnode/functions/RexCallConverterFactory.java | 8 +-
.../operators/GlutenVectorOneInputOperator.java | 58 ++++-
.../operators/GlutenVectorSourceFunction.java | 69 +++++-
.../operators/GlutenVectorTwoInputOperator.java | 66 ++++--
.../apache/gluten/util/LogicalTypeConverter.java | 7 +-
16 files changed, 471 insertions(+), 372 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index ed579c0729..6c929887ce 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -43,7 +43,7 @@ jobs:
source /opt/rh/gcc-toolset-11/enable
sudo dnf install -y patchelf
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
- cd velox4j && git reset --hard
a5e3e9d7f11440f8c4eafeff88ae6945186d02c1
+ cd velox4j && git reset --hard
1cdeb1a8384967499919e655d55a66f2daa9d55c
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index b1fefc93e5..bfff2bee9b 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you
have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
-git reset --hard a5e3e9d7f11440f8c4eafeff88ae6945186d02c1
+git reset --hard 1cdeb1a8384967499919e655d55a66f2daa9d55c
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
```
**Get gluten**
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
similarity index 54%
copy from
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
copy to
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
index 9319bbc846..ed89b71774 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeduplicate.java
@@ -16,21 +16,20 @@
*/
package org.apache.flink.table.planner.plan.nodes.exec.stream;
-import org.apache.gluten.rexnode.AggregateCallConverter;
-import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
-import io.github.zhztheplayer.velox4j.aggregate.Aggregate;
-import io.github.zhztheplayer.velox4j.aggregate.AggregateStep;
-import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
-import io.github.zhztheplayer.velox4j.plan.AggregationNode;
+import io.github.zhztheplayer.velox4j.plan.DeduplicateNode;
import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.StreamRankNode;
import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -39,125 +38,123 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.RowType;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.calcite.rel.core.AggregateCall;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_DEDUPLICATE_INSERT_UPDATE_AFTER_SENSITIVE_ENABLED;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * Stream {@link ExecNode} for unbounded group aggregate.
- *
- * <p>This node does support un-splittable aggregate function (e.g.
STDDEV_POP).
+ * Stream {@link ExecNode} which deduplicate on keys and keeps only first row
or last row. This node
+ * is an optimization of {@link StreamExecRank} for some special cases.
Compared to {@link
+ * StreamExecRank}, this node could use mini-batch and access less state.
*/
@ExecNodeMetadata(
- name = "stream-exec-group-aggregate",
+ name = "stream-exec-deduplicate",
version = 1,
- consumedOptions = {"table.exec.mini-batch.enabled",
"table.exec.mini-batch.size"},
- producedTransformations =
StreamExecGroupAggregate.GROUP_AGGREGATE_TRANSFORMATION,
+ consumedOptions = {
+ "table.exec.mini-batch.enabled",
+ "table.exec.mini-batch.size",
+ "table.exec.deduplicate.insert-update-after-sensitive-enabled",
+ "table.exec.deduplicate.mini-batch.compact-changes-enabled"
+ },
+ producedTransformations = StreamExecDeduplicate.DEDUPLICATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
-public class StreamExecGroupAggregate extends StreamExecAggregateBase {
-
- private static final Logger LOG =
LoggerFactory.getLogger(StreamExecGroupAggregate.class);
+public class StreamExecDeduplicate extends ExecNodeBase<RowData>
+ implements StreamExecNode<RowData>,
SingleTransformationTranslator<RowData> {
- public static final String GROUP_AGGREGATE_TRANSFORMATION =
"group-aggregate";
+ public static final String DEDUPLICATE_TRANSFORMATION = "deduplicate";
- public static final String STATE_NAME = "groupAggregateState";
+ public static final String FIELD_NAME_UNIQUE_KEYS = "uniqueKeys";
+ public static final String FIELD_NAME_IS_ROWTIME = "isRowtime";
+ public static final String FIELD_NAME_KEEP_LAST_ROW = "keepLastRow";
+ public static final String FIELD_NAME_GENERATE_UPDATE_BEFORE =
"generateUpdateBefore";
+ public static final String STATE_NAME = "deduplicateState";
- @JsonProperty(FIELD_NAME_GROUPING)
- private final int[] grouping;
+ @JsonProperty(FIELD_NAME_UNIQUE_KEYS)
+ private final int[] uniqueKeys;
- @JsonProperty(FIELD_NAME_AGG_CALLS)
- private final AggregateCall[] aggCalls;
+ @JsonProperty(FIELD_NAME_IS_ROWTIME)
+ private final boolean isRowtime;
- /** Each element indicates whether the corresponding agg call needs
`retract` method. */
- @JsonProperty(FIELD_NAME_AGG_CALL_NEED_RETRACTIONS)
- private final boolean[] aggCallNeedRetractions;
+ @JsonProperty(FIELD_NAME_KEEP_LAST_ROW)
+ private final boolean keepLastRow;
- /** Whether this node will generate UPDATE_BEFORE messages. */
@JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE)
private final boolean generateUpdateBefore;
- /** Whether this node consumes retraction messages. */
- @JsonProperty(FIELD_NAME_NEED_RETRACTION)
- private final boolean needRetraction;
-
@Nullable
@JsonProperty(FIELD_NAME_STATE)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final List<StateMetadata> stateMetadataList;
- public StreamExecGroupAggregate(
+ public StreamExecDeduplicate(
ReadableConfig tableConfig,
- int[] grouping,
- AggregateCall[] aggCalls,
- boolean[] aggCallNeedRetractions,
+ int[] uniqueKeys,
+ boolean isRowtime,
+ boolean keepLastRow,
boolean generateUpdateBefore,
- boolean needRetraction,
- @Nullable Long stateTtlFromHint,
InputProperty inputProperty,
RowType outputType,
String description) {
this(
ExecNodeContext.newNodeId(),
- ExecNodeContext.newContext(StreamExecGroupAggregate.class),
- ExecNodeContext.newPersistedConfig(StreamExecGroupAggregate.class,
tableConfig),
- grouping,
- aggCalls,
- aggCallNeedRetractions,
+ ExecNodeContext.newContext(StreamExecDeduplicate.class),
+ ExecNodeContext.newPersistedConfig(StreamExecDeduplicate.class,
tableConfig),
+ uniqueKeys,
+ isRowtime,
+ keepLastRow,
generateUpdateBefore,
- needRetraction,
- StateMetadata.getOneInputOperatorDefaultMeta(stateTtlFromHint,
tableConfig, STATE_NAME),
+ StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, STATE_NAME),
Collections.singletonList(inputProperty),
outputType,
description);
}
@JsonCreator
- public StreamExecGroupAggregate(
+ public StreamExecDeduplicate(
@JsonProperty(FIELD_NAME_ID) int id,
@JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
@JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
- @JsonProperty(FIELD_NAME_GROUPING) int[] grouping,
- @JsonProperty(FIELD_NAME_AGG_CALLS) AggregateCall[] aggCalls,
- @JsonProperty(FIELD_NAME_AGG_CALL_NEED_RETRACTIONS) boolean[]
aggCallNeedRetractions,
+ @JsonProperty(FIELD_NAME_UNIQUE_KEYS) int[] uniqueKeys,
+ @JsonProperty(FIELD_NAME_IS_ROWTIME) boolean isRowtime,
+ @JsonProperty(FIELD_NAME_KEEP_LAST_ROW) boolean keepLastRow,
@JsonProperty(FIELD_NAME_GENERATE_UPDATE_BEFORE) boolean
generateUpdateBefore,
- @JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction,
@Nullable @JsonProperty(FIELD_NAME_STATE) List<StateMetadata>
stateMetadataList,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(id, context, persistedConfig, inputProperties, outputType,
description);
- this.grouping = checkNotNull(grouping);
- this.aggCalls = checkNotNull(aggCalls);
- this.aggCallNeedRetractions = checkNotNull(aggCallNeedRetractions);
- checkArgument(aggCalls.length == aggCallNeedRetractions.length);
+ checkArgument(inputProperties.size() == 1);
+ this.uniqueKeys = checkNotNull(uniqueKeys);
+ this.isRowtime = isRowtime;
+ this.keepLastRow = keepLastRow;
this.generateUpdateBefore = generateUpdateBefore;
- this.needRetraction = needRetraction;
this.stateMetadataList = stateMetadataList;
}
@@ -165,72 +162,78 @@ public class StreamExecGroupAggregate extends
StreamExecAggregateBase {
@Override
protected Transformation<RowData> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
-
- final long stateRetentionTime =
- StateMetadata.getStateTtlForOneInputOperator(config,
stateMetadataList);
- if (grouping.length > 0 && stateRetentionTime < 0) {
- LOG.warn(
- "No state retention interval configured for a query which
accumulates state. "
- + "Please provide a query configuration with valid retention
interval to prevent excessive "
- + "state size. You may specify a retention time of 0 to not
clean up the state.");
- }
-
final ExecEdge inputEdge = getInputEdges().get(0);
final Transformation<RowData> inputTransform =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
+
final RowType inputRowType = (RowType) inputEdge.getOutputType();
+ final InternalTypeInfo<RowData> rowTypeInfo =
+ (InternalTypeInfo<RowData>) inputTransform.getOutputType();
+ final TypeSerializer<RowData> rowSerializer =
+
rowTypeInfo.createSerializer(planner.getExecEnv().getConfig().getSerializerConfig());
+ final OneInputStreamOperator operator;
+
+ long stateRetentionTime =
+ StateMetadata.getStateTtlForOneInputOperator(config,
stateMetadataList);
// --- Begin Gluten-specific code changes ---
io.github.zhztheplayer.velox4j.type.RowType inputType =
(io.github.zhztheplayer.velox4j.type.RowType)
LogicalTypeConverter.toVLType(inputRowType);
- List<FieldAccessTypedExpr> groupingKeys =
Utils.generateFieldAccesses(inputType, grouping);
- List<Aggregate> aggregates = AggregateCallConverter.toAggregates(aggCalls,
inputType);
io.github.zhztheplayer.velox4j.type.RowType outputType =
(io.github.zhztheplayer.velox4j.type.RowType)
LogicalTypeConverter.toVLType(getOutputType());
- checkArgument(outputType.getNames().size() == grouping.length +
aggCalls.length);
- List<String> aggNames =
- outputType.getNames().stream()
- .skip(grouping.length)
- .limit(aggCalls.length)
- .collect(Collectors.toList());
- // TODO: velox agg may not equal to flink
- PlanNode aggregation =
- new AggregationNode(
- PlanNodeIdGenerator.newId(),
- AggregateStep.SINGLE,
- groupingKeys,
- groupingKeys,
- aggNames,
- aggregates,
- false,
- List.of(new EmptyNode(inputType)),
- null,
- List.of());
- final OneInputStreamOperator operator =
- new GlutenVectorOneInputOperator(
- new StatefulPlanNode(aggregation.getId(), aggregation),
- PlanNodeIdGenerator.newId(),
- inputType,
- Map.of(aggregation.getId(), outputType));
+ if (isRowtime) {
+ int rowtimeIndex = -1;
+ for (int i = 0; i < inputRowType.getFieldCount(); ++i) {
+ if (TypeCheckUtils.isRowTime(inputRowType.getTypeAt(i))) {
+ rowtimeIndex = i;
+ break;
+ }
+ }
+ boolean generateInsert =
+
config.get(TABLE_EXEC_DEDUPLICATE_INSERT_UPDATE_AFTER_SENSITIVE_ENABLED);
+ PlanNode deduplicateNode =
+ new DeduplicateNode(
+ PlanNodeIdGenerator.newId(),
+ List.of(new EmptyNode(inputType)), // sources
+ outputType,
+ stateRetentionTime,
+ rowtimeIndex,
+ generateUpdateBefore,
+ generateInsert,
+ keepLastRow);
+ List<Integer> keyIndexes =
Arrays.stream(uniqueKeys).boxed().collect(Collectors.toList());
+ PartitionFunctionSpec keySelectorSpec = new
HashPartitionFunctionSpec(inputType, keyIndexes);
+ final PlanNode streamRankNode =
+ new StreamRankNode(
+ PlanNodeIdGenerator.newId(),
+ List.of(new EmptyNode(inputType)), // sources
+ keySelectorSpec,
+ deduplicateNode,
+ outputType);
+ operator =
+ new GlutenVectorOneInputOperator(
+ new StatefulPlanNode(streamRankNode.getId(), streamRankNode),
+ PlanNodeIdGenerator.newId(),
+ inputType,
+ Map.of(streamRankNode.getId(), outputType));
+ } else {
+ throw new RuntimeException("ProcTime in deduplicate is not supported.");
+ }
// --- End Gluten-specific code changes ---
- // partitioned aggregation
final OneInputTransformation<RowData, RowData> transform =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
- createTransformationMeta(GROUP_AGGREGATE_TRANSFORMATION, config),
+ createTransformationMeta(DEDUPLICATE_TRANSFORMATION, config),
operator,
- InternalTypeInfo.of(getOutputType()),
+ rowTypeInfo,
inputTransform.getParallelism(),
false);
- // set KeyType and Selector for state
final RowDataKeySelector selector =
KeySelectorUtil.getRowDataSelector(
- planner.getFlinkContext().getClassLoader(),
- grouping,
- InternalTypeInfo.of(inputRowType));
+ planner.getFlinkContext().getClassLoader(), uniqueKeys,
rowTypeInfo);
transform.setStateKeySelector(selector);
transform.setStateKeyType(selector.getProducedType());
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
index 6c12febf40..ee8c91a375 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
@@ -184,6 +184,7 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
final ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
windowing.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
+
// --- Begin Gluten-specific code changes ---
// TODO: velox window not equal to flink window.
io.github.zhztheplayer.velox4j.type.RowType inputType =
@@ -250,7 +251,8 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
slide,
offset,
windowType,
- outputType);
+ outputType,
+ rowtimeIndex);
final OneInputStreamOperator windowOperator =
new GlutenVectorOneInputOperator(
new StatefulPlanNode(windowAgg.getId(), windowAgg),
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
index 9319bbc846..6758111254 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
@@ -23,10 +23,11 @@ import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
import io.github.zhztheplayer.velox4j.aggregate.Aggregate;
-import io.github.zhztheplayer.velox4j.aggregate.AggregateStep;
import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
-import io.github.zhztheplayer.velox4j.plan.AggregationNode;
-import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.GroupAggregationNode;
+import io.github.zhztheplayer.velox4j.plan.GroupAggsHandlerNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
@@ -60,6 +61,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -194,19 +196,15 @@ public class StreamExecGroupAggregate extends
StreamExecAggregateBase {
.skip(grouping.length)
.limit(aggCalls.length)
.collect(Collectors.toList());
+ List<Integer> keyIndexes =
Arrays.stream(grouping).boxed().collect(Collectors.toList());
+ PartitionFunctionSpec keySelectorSpec = new
HashPartitionFunctionSpec(inputType, keyIndexes);
+ PlanNode aggsHandlerNode =
+ new GroupAggsHandlerNode(
+ PlanNodeIdGenerator.newId(), outputType, generateUpdateBefore,
needRetraction);
// TODO: velox agg may not equal to flink
PlanNode aggregation =
- new AggregationNode(
- PlanNodeIdGenerator.newId(),
- AggregateStep.SINGLE,
- groupingKeys,
- groupingKeys,
- aggNames,
- aggregates,
- false,
- List.of(new EmptyNode(inputType)),
- null,
- List.of());
+ new GroupAggregationNode(
+ PlanNodeIdGenerator.newId(), aggsHandlerNode, keySelectorSpec,
outputType);
final OneInputStreamOperator operator =
new GlutenVectorOneInputOperator(
new StatefulPlanNode(aggregation.getId(), aggregation),
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
similarity index 57%
copy from
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
copy to
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
index 6c12febf40..a2ce1425ea 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
@@ -18,35 +18,32 @@ package
org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.gluten.rexnode.AggregateCallConverter;
import org.apache.gluten.rexnode.Utils;
-import org.apache.gluten.rexnode.WindowUtils;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
import io.github.zhztheplayer.velox4j.aggregate.Aggregate;
-import io.github.zhztheplayer.velox4j.aggregate.AggregateStep;
import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
-import io.github.zhztheplayer.velox4j.plan.AggregationNode;
-import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.GroupWindowAggregationNode;
+import io.github.zhztheplayer.velox4j.plan.GroupWindowAggsHandlerNode;
import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
-import io.github.zhztheplayer.velox4j.plan.StreamWindowAggregationNode;
import io.github.zhztheplayer.velox4j.plan.StreamWindowPartitionFunctionSpec;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
-import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
+import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
@@ -54,55 +51,59 @@ import
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.TableConfigUtils;
-import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
+import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
-import org.apache.flink.table.runtime.groupwindow.WindowProperty;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
-import
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.runtime.util.TimeWindowUtil;
-import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.tools.RelBuilder;
-import org.apache.commons.math3.util.ArithmeticUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
-import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.TimeZone;
import java.util.stream.Collectors;
+import static
org.apache.flink.table.planner.plan.utils.AggregateUtil.hasRowIntervalType;
+import static
org.apache.flink.table.planner.plan.utils.AggregateUtil.isRowtimeAttribute;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
-/** Stream {@link ExecNode} for window table-valued based global aggregate. */
+/**
+ * Stream {@link ExecNode} for either group window aggregate or group window
table aggregate.
+ *
+ * <p>The differences between {@link StreamExecWindowAggregate} and {@link
+ * StreamExecGroupWindowAggregate} is that, this node is translated from
window TVF syntax, but the
+ * * other is from the legacy GROUP WINDOW FUNCTION syntax. In the long
future, {@link
+ * StreamExecGroupWindowAggregate} will be dropped.
+ */
@ExecNodeMetadata(
- name = "stream-exec-global-window-aggregate",
+ name = "stream-exec-group-window-aggregate",
version = 1,
- consumedOptions = "table.local-time-zone",
- producedTransformations =
- StreamExecGlobalWindowAggregate.GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION,
+ consumedOptions = {
+ "table.local-time-zone",
+ "table.exec.mini-batch.enabled",
+ "table.exec.mini-batch.size"
+ },
+ producedTransformations =
StreamExecGroupWindowAggregate.GROUP_WINDOW_AGGREGATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
-public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBase {
+public class StreamExecGroupWindowAggregate extends StreamExecAggregateBase {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(StreamExecGroupWindowAggregate.class);
- public static final String GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION =
"global-window-aggregate";
+ public static final String GROUP_WINDOW_AGGREGATE_TRANSFORMATION =
"group-window-aggregate";
- public static final String FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE =
"localAggInputRowType";
+ public static final String FIELD_NAME_WINDOW = "window";
+ public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES =
"namedWindowProperties";
@JsonProperty(FIELD_NAME_GROUPING)
private final int[] grouping;
@@ -110,80 +111,101 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
@JsonProperty(FIELD_NAME_AGG_CALLS)
private final AggregateCall[] aggCalls;
- @JsonProperty(FIELD_NAME_WINDOWING)
- private final WindowingStrategy windowing;
+ @JsonProperty(FIELD_NAME_WINDOW)
+ private final LogicalWindow window;
@JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
private final NamedWindowProperty[] namedWindowProperties;
- /** The input row type of this node's local agg. */
- @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE)
- private final RowType localAggInputRowType;
-
@JsonProperty(FIELD_NAME_NEED_RETRACTION)
private final boolean needRetraction;
- public StreamExecGlobalWindowAggregate(
+ public StreamExecGroupWindowAggregate(
ReadableConfig tableConfig,
int[] grouping,
AggregateCall[] aggCalls,
- WindowingStrategy windowing,
+ LogicalWindow window,
NamedWindowProperty[] namedWindowProperties,
- Boolean needRetraction,
+ boolean needRetraction,
InputProperty inputProperty,
- RowType localAggInputRowType,
RowType outputType,
String description) {
this(
ExecNodeContext.newNodeId(),
- ExecNodeContext.newContext(StreamExecGlobalWindowAggregate.class),
-
ExecNodeContext.newPersistedConfig(StreamExecGlobalWindowAggregate.class,
tableConfig),
+ ExecNodeContext.newContext(StreamExecGroupWindowAggregate.class),
+
ExecNodeContext.newPersistedConfig(StreamExecGroupWindowAggregate.class,
tableConfig),
grouping,
aggCalls,
- windowing,
+ window,
namedWindowProperties,
needRetraction,
Collections.singletonList(inputProperty),
- localAggInputRowType,
outputType,
description);
}
@JsonCreator
- public StreamExecGlobalWindowAggregate(
+ public StreamExecGroupWindowAggregate(
@JsonProperty(FIELD_NAME_ID) int id,
@JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
@JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
@JsonProperty(FIELD_NAME_GROUPING) int[] grouping,
@JsonProperty(FIELD_NAME_AGG_CALLS) AggregateCall[] aggCalls,
- @JsonProperty(FIELD_NAME_WINDOWING) WindowingStrategy windowing,
+ @JsonProperty(FIELD_NAME_WINDOW) LogicalWindow window,
@JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES) NamedWindowProperty[]
namedWindowProperties,
- @Nullable @JsonProperty(FIELD_NAME_NEED_RETRACTION) Boolean
needRetraction,
+ @JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
- @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE) RowType
localAggInputRowType,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(id, context, persistedConfig, inputProperties, outputType,
description);
+ checkArgument(inputProperties.size() == 1);
this.grouping = checkNotNull(grouping);
this.aggCalls = checkNotNull(aggCalls);
- this.windowing = checkNotNull(windowing);
+ this.window = checkNotNull(window);
this.namedWindowProperties = checkNotNull(namedWindowProperties);
- this.localAggInputRowType = localAggInputRowType;
- this.needRetraction = Optional.ofNullable(needRetraction).orElse(false);
+ this.needRetraction = needRetraction;
}
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
+ final boolean isCountWindow;
+ if (window instanceof TumblingGroupWindow) {
+ isCountWindow = hasRowIntervalType(((TumblingGroupWindow)
window).size());
+ } else if (window instanceof SlidingGroupWindow) {
+ isCountWindow = hasRowIntervalType(((SlidingGroupWindow) window).size());
+ } else {
+ isCountWindow = false;
+ }
+
+ if (isCountWindow && grouping.length > 0 && config.getStateRetentionTime()
< 0) {
+ LOGGER.warn(
+ "No state retention interval configured for a query which
accumulates state. "
+ + "Please provide a query configuration with valid retention
interval to prevent "
+ + "excessive state size. You may specify a retention time of 0
to not clean up the state.");
+ }
+
final ExecEdge inputEdge = getInputEdges().get(0);
final Transformation<RowData> inputTransform =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
final RowType inputRowType = (RowType) inputEdge.getOutputType();
- final ZoneId shiftTimeZone =
- TimeWindowUtil.getShiftTimeZone(
- windowing.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
+ final int inputTimeFieldIndex;
+ if (isRowtimeAttribute(window.timeAttribute())) {
+ inputTimeFieldIndex = window.timeAttribute().getFieldIndex();
+ if (inputTimeFieldIndex < 0) {
+ throw new TableException(
+ "Group window must defined on a time attribute, "
+ + "but the time attribute can't be found.\n"
+ + "This should never happen. Please file an issue.");
+ }
+ } else {
+ inputTimeFieldIndex = -1;
+ }
+
+ WindowEmitStrategy emitStrategy = WindowEmitStrategy.apply(config, window);
+
// --- Begin Gluten-specific code changes ---
// TODO: velox window not equal to flink window.
io.github.zhztheplayer.velox4j.type.RowType inputType =
@@ -202,54 +224,20 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
List<Integer> keyIndexes =
Arrays.stream(grouping).boxed().collect(Collectors.toList());
PartitionFunctionSpec keySelectorSpec = new
HashPartitionFunctionSpec(inputType, keyIndexes);
// TODO: support more window types.
- Tuple5<Long, Long, Long, Integer, Integer> windowSpecParams =
- WindowUtils.extractWindowParameters(windowing);
- long size = windowSpecParams.f0;
- long slide = windowSpecParams.f1;
- long offset = windowSpecParams.f2;
- int rowtimeIndex = windowSpecParams.f3;
- int windowType = windowSpecParams.f4;
PartitionFunctionSpec sliceAssignerSpec =
- new StreamWindowPartitionFunctionSpec(
- inputType, rowtimeIndex, size, slide, offset, windowType);
- PlanNode aggregation =
- new AggregationNode(
- PlanNodeIdGenerator.newId(),
- AggregateStep.SINGLE,
- groupingKeys,
- groupingKeys,
- aggNames,
- aggregates,
- false,
- List.of(new EmptyNode(inputType)),
- null,
- List.of());
- PlanNode localAgg =
- new AggregationNode(
- PlanNodeIdGenerator.newId(),
- AggregateStep.SINGLE,
- groupingKeys,
- groupingKeys,
- aggNames,
- aggregates,
- false,
- List.of(new EmptyNode(inputType)),
- null,
- List.of());
+ new StreamWindowPartitionFunctionSpec(inputType, inputTimeFieldIndex,
0L, 0L, 0L, 1);
+ PlanNode aggregation = new
GroupWindowAggsHandlerNode(PlanNodeIdGenerator.newId(), outputType);
PlanNode windowAgg =
- new StreamWindowAggregationNode(
+ new GroupWindowAggregationNode(
PlanNodeIdGenerator.newId(),
aggregation,
- localAgg,
keySelectorSpec,
sliceAssignerSpec,
- ArithmeticUtils.gcd(size, slide),
- TimeZone.getTimeZone(shiftTimeZone).useDaylightTime(),
- false,
- size,
- slide,
- offset,
- windowType,
+ emitStrategy.getAllowLateness(),
+ emitStrategy.produceUpdates(),
+ inputTimeFieldIndex,
+ true, // TODO: get from window attributes
+ 1, // TODO: get from window attributes
outputType);
final OneInputStreamOperator windowOperator =
new GlutenVectorOneInputOperator(
@@ -259,61 +247,23 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
Map.of(windowAgg.getId(), outputType));
// --- End Gluten-specific code changes ---
- final RowDataKeySelector selector =
- KeySelectorUtil.getRowDataSelector(
- planner.getFlinkContext().getClassLoader(),
- grouping,
- InternalTypeInfo.of(inputRowType));
-
final OneInputTransformation<RowData, RowData> transform =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
- createTransformationMeta(GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION,
config),
+ createTransformationMeta(GROUP_WINDOW_AGGREGATE_TRANSFORMATION,
config),
SimpleOperatorFactory.of(windowOperator),
InternalTypeInfo.of(getOutputType()),
inputTransform.getParallelism(),
- WINDOW_AGG_MEMORY_RATIO,
false);
// set KeyType and Selector for state
+ final RowDataKeySelector selector =
+ KeySelectorUtil.getRowDataSelector(
+ planner.getFlinkContext().getClassLoader(),
+ grouping,
+ InternalTypeInfo.of(inputRowType));
transform.setStateKeySelector(selector);
transform.setStateKeyType(selector.getProducedType());
return transform;
}
-
- private GeneratedNamespaceAggsHandleFunction<Long> createAggsHandler(
- String name,
- SliceAssigner sliceAssigner,
- AggregateInfoList aggInfoList,
- int mergedAccOffset,
- boolean mergedAccIsOnHeap,
- DataType[] mergedAccExternalTypes,
- ExecNodeConfig config,
- ClassLoader classLoader,
- RelBuilder relBuilder,
- ZoneId shifTimeZone) {
- final AggsHandlerCodeGenerator generator =
- new AggsHandlerCodeGenerator(
- new CodeGeneratorContext(config, classLoader),
- relBuilder,
-
JavaScalaConversionUtil.toScala(localAggInputRowType.getChildren()),
- true) // copyInputField
- .needAccumulate()
- .needMerge(mergedAccOffset, mergedAccIsOnHeap,
mergedAccExternalTypes);
-
- final List<WindowProperty> windowProperties =
- Arrays.asList(
- Arrays.stream(namedWindowProperties)
- .map(NamedWindowProperty::getProperty)
- .toArray(WindowProperty[]::new));
-
- return generator.generateNamespaceAggsHandler(
- name,
- aggInfoList,
- JavaScalaConversionUtil.toScala(windowProperties),
- sliceAssigner,
- // we use window end timestamp to indicate a slicing window, see
SliceAssigner
- Long.class,
- shifTimeZone);
- }
}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
index 30c80e8c7c..4ffcf7998c 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
@@ -219,7 +219,8 @@ public class StreamExecLocalWindowAggregate extends
StreamExecWindowAggregateBas
slide,
offset,
windowType,
- outputType);
+ outputType,
+ rowtimeIndex);
final OneInputStreamOperator localAggOperator =
new GlutenVectorOneInputOperator(
new StatefulPlanNode(windowAgg.getId(), windowAgg),
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
index 6099f3dfb8..c1b87e8f60 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
@@ -23,8 +23,13 @@ import org.apache.gluten.util.PlanNodeIdGenerator;
import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.HashPartitionFunctionSpec;
+import io.github.zhztheplayer.velox4j.plan.PartitionFunctionSpec;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.StreamRankNode;
+import io.github.zhztheplayer.velox4j.plan.StreamTopNNode;
+import io.github.zhztheplayer.velox4j.plan.TopNNode;
import io.github.zhztheplayer.velox4j.plan.TopNRowNumberNode;
import io.github.zhztheplayer.velox4j.sort.SortOrder;
@@ -64,10 +69,13 @@ import javax.annotation.Nullable;
import javax.swing.*;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RANK_TOPN_CACHE_SIZE;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -200,6 +208,7 @@ public class StreamExecRank extends ExecNodeBase<RowData>
RowType inputRowType = (RowType) inputEdge.getOutputType();
InternalTypeInfo<RowData> inputRowTypeInfo =
InternalTypeInfo.of(inputRowType);
int[] sortFields = sortSpec.getFieldIndices();
+ long cacheSize = config.get(TABLE_EXEC_RANK_TOPN_CACHE_SIZE);
// --- Begin Gluten-specific code changes ---
io.github.zhztheplayer.velox4j.type.RowType inputType =
@@ -215,21 +224,55 @@ public class StreamExecRank extends ExecNodeBase<RowData>
LogicalTypeConverter.toVLType(getOutputType());
// TODO: velox RowNumber may not equal to flink
int limit = 1;
- final PlanNode rowNumberNode =
- new TopNRowNumberNode(
+ final PlanNode topNNode;
+ if (outputRankNumber) {
+ topNNode =
+ new TopNRowNumberNode(
+ PlanNodeIdGenerator.newId(),
+ partitionKeys,
+ sortKeys,
+ sortOrders,
+ null,
+ limit,
+ List.of(new EmptyNode(inputType)));
+ } else {
+ topNNode =
+ new TopNNode(
+ PlanNodeIdGenerator.newId(),
+ sortKeys,
+ sortOrders,
+ limit,
+ false,
+ List.of(new EmptyNode(inputType)));
+ }
+ List<Integer> keyIndexes =
Arrays.stream(partitionFields).boxed().collect(Collectors.toList());
+ PartitionFunctionSpec keySelectorSpec = new
HashPartitionFunctionSpec(inputType, keyIndexes);
+ List<Integer> sortKeyIndexes =
Arrays.stream(sortFields).boxed().collect(Collectors.toList());
+ PartitionFunctionSpec sortKeySelectorSpec =
+ new HashPartitionFunctionSpec(inputType, sortKeyIndexes);
+ final PlanNode streamTopNNode =
+ new StreamTopNNode(
+ PlanNodeIdGenerator.newId(),
+ List.of(new EmptyNode(inputType)), // sources
+ topNNode,
+ sortKeySelectorSpec,
+ outputType,
+ generateUpdateBefore,
+ outputRankNumber,
+ cacheSize);
+ final PlanNode streamRankNode =
+ new StreamRankNode(
PlanNodeIdGenerator.newId(),
- partitionKeys,
- sortKeys,
- sortOrders,
- null,
- limit,
- List.of(new EmptyNode(inputType)));
+ List.of(new EmptyNode(inputType)), // sources
+ keySelectorSpec,
+ streamTopNNode,
+ outputType);
final OneInputStreamOperator operator =
new GlutenVectorOneInputOperator(
- new StatefulPlanNode(rowNumberNode.getId(), rowNumberNode),
+ new StatefulPlanNode(streamRankNode.getId(), streamRankNode),
PlanNodeIdGenerator.newId(),
inputType,
- Map.of(rowNumberNode.getId(), outputType));
+ Map.of(streamRankNode.getId(), outputType));
// --- End Gluten-specific code changes ---
OneInputTransformation<RowData, RowData> transform =
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
similarity index 77%
copy from
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
copy to
gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
index 6c12febf40..afe5d01483 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
@@ -43,8 +43,6 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
-import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
@@ -54,25 +52,18 @@ import
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
-import
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
-import org.apache.flink.table.runtime.groupwindow.WindowProperty;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
-import
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
-import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.math3.util.ArithmeticUtils;
import javax.annotation.Nullable;
@@ -89,20 +80,29 @@ import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
-/** Stream {@link ExecNode} for window table-valued based global aggregate. */
+/**
+ * Stream {@link ExecNode} for window table-valued based aggregate.
+ *
+ * <p>The differences between {@link StreamExecWindowAggregate} and {@link
+ * StreamExecGroupWindowAggregate} is that, this node is translated from
window TVF syntax, but the
+ * other is from the legacy GROUP WINDOW FUNCTION syntax. In the long future,
{@link
+ * StreamExecGroupWindowAggregate} will be dropped.
+ */
@ExecNodeMetadata(
- name = "stream-exec-global-window-aggregate",
+ name = "stream-exec-window-aggregate",
version = 1,
consumedOptions = "table.local-time-zone",
- producedTransformations =
- StreamExecGlobalWindowAggregate.GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION,
+ producedTransformations =
StreamExecWindowAggregate.WINDOW_AGGREGATE_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
-public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBase {
+public class StreamExecWindowAggregate extends StreamExecWindowAggregateBase {
+
+ public static final String WINDOW_AGGREGATE_TRANSFORMATION =
"window-aggregate";
- public static final String GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION =
"global-window-aggregate";
+ private static final long WINDOW_AGG_MEMORY_RATIO = 100;
- public static final String FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE =
"localAggInputRowType";
+ public static final String FIELD_NAME_WINDOWING = "windowing";
+ public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES =
"namedWindowProperties";
@JsonProperty(FIELD_NAME_GROUPING)
private final int[] grouping;
@@ -116,14 +116,10 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
@JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
private final NamedWindowProperty[] namedWindowProperties;
- /** The input row type of this node's local agg. */
- @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE)
- private final RowType localAggInputRowType;
-
@JsonProperty(FIELD_NAME_NEED_RETRACTION)
private final boolean needRetraction;
- public StreamExecGlobalWindowAggregate(
+ public StreamExecWindowAggregate(
ReadableConfig tableConfig,
int[] grouping,
AggregateCall[] aggCalls,
@@ -131,26 +127,24 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
NamedWindowProperty[] namedWindowProperties,
Boolean needRetraction,
InputProperty inputProperty,
- RowType localAggInputRowType,
RowType outputType,
String description) {
this(
ExecNodeContext.newNodeId(),
- ExecNodeContext.newContext(StreamExecGlobalWindowAggregate.class),
-
ExecNodeContext.newPersistedConfig(StreamExecGlobalWindowAggregate.class,
tableConfig),
+ ExecNodeContext.newContext(StreamExecWindowAggregate.class),
+ ExecNodeContext.newPersistedConfig(StreamExecWindowAggregate.class,
tableConfig),
grouping,
aggCalls,
windowing,
namedWindowProperties,
needRetraction,
Collections.singletonList(inputProperty),
- localAggInputRowType,
outputType,
description);
}
@JsonCreator
- public StreamExecGlobalWindowAggregate(
+ public StreamExecWindowAggregate(
@JsonProperty(FIELD_NAME_ID) int id,
@JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
@JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
@@ -160,7 +154,6 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
@JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES) NamedWindowProperty[]
namedWindowProperties,
@Nullable @JsonProperty(FIELD_NAME_NEED_RETRACTION) Boolean
needRetraction,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
- @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE) RowType
localAggInputRowType,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(id, context, persistedConfig, inputProperties, outputType,
description);
@@ -168,7 +161,6 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
this.aggCalls = checkNotNull(aggCalls);
this.windowing = checkNotNull(windowing);
this.namedWindowProperties = checkNotNull(namedWindowProperties);
- this.localAggInputRowType = localAggInputRowType;
this.needRetraction = Optional.ofNullable(needRetraction).orElse(false);
}
@@ -184,6 +176,7 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
final ZoneId shiftTimeZone =
TimeWindowUtil.getShiftTimeZone(
windowing.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
+
// --- Begin Gluten-specific code changes ---
// TODO: velox window not equal to flink window.
io.github.zhztheplayer.velox4j.type.RowType inputType =
@@ -250,7 +243,8 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
slide,
offset,
windowType,
- outputType);
+ outputType,
+ rowtimeIndex);
final OneInputStreamOperator windowOperator =
new GlutenVectorOneInputOperator(
new StatefulPlanNode(windowAgg.getId(), windowAgg),
@@ -268,7 +262,7 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
final OneInputTransformation<RowData, RowData> transform =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
- createTransformationMeta(GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION,
config),
+ createTransformationMeta(WINDOW_AGGREGATE_TRANSFORMATION, config),
SimpleOperatorFactory.of(windowOperator),
InternalTypeInfo.of(getOutputType()),
inputTransform.getParallelism(),
@@ -280,40 +274,4 @@ public class StreamExecGlobalWindowAggregate extends
StreamExecWindowAggregateBa
transform.setStateKeyType(selector.getProducedType());
return transform;
}
-
- private GeneratedNamespaceAggsHandleFunction<Long> createAggsHandler(
- String name,
- SliceAssigner sliceAssigner,
- AggregateInfoList aggInfoList,
- int mergedAccOffset,
- boolean mergedAccIsOnHeap,
- DataType[] mergedAccExternalTypes,
- ExecNodeConfig config,
- ClassLoader classLoader,
- RelBuilder relBuilder,
- ZoneId shifTimeZone) {
- final AggsHandlerCodeGenerator generator =
- new AggsHandlerCodeGenerator(
- new CodeGeneratorContext(config, classLoader),
- relBuilder,
-
JavaScalaConversionUtil.toScala(localAggInputRowType.getChildren()),
- true) // copyInputField
- .needAccumulate()
- .needMerge(mergedAccOffset, mergedAccIsOnHeap,
mergedAccExternalTypes);
-
- final List<WindowProperty> windowProperties =
- Arrays.asList(
- Arrays.stream(namedWindowProperties)
- .map(NamedWindowProperty::getProperty)
- .toArray(WindowProperty[]::new));
-
- return generator.generateNamespaceAggsHandler(
- name,
- aggInfoList,
- JavaScalaConversionUtil.toScala(windowProperties),
- sliceAssigner,
- // we use window end timestamp to indicate a slicing window, see
SliceAssigner
- Long.class,
- shifTimeZone);
- }
}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
index 60b6f71fa9..29cd64c30b 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/WindowUtils.java
@@ -59,6 +59,8 @@ public class WindowUtils {
if (windowOffset != null) {
offset = windowOffset.toMillis();
}
+ } else {
+ throw new RuntimeException("Not support window spec " + windowSpec);
}
if (windowing instanceof TimeAttributeWindowingStrategy &&
windowing.isRowtime()) {
@@ -69,6 +71,8 @@ public class WindowUtils {
windowType = 1;
} else if (windowing instanceof SliceAttachedWindowingStrategy) {
rowtimeIndex = ((SliceAttachedWindowingStrategy)
windowing).getSliceEnd();
+ } else {
+ throw new RuntimeException("Not support window strategy " + windowing);
}
return new Tuple5<Long, Long, Long, Integer, Integer>(
size, slide, offset, rowtimeIndex, windowType);
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
index fa2385aaf1..fdc1ac5370 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/BaseRexCallConverters.java
@@ -23,6 +23,7 @@ import org.apache.gluten.rexnode.ValidationResult;
import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
import io.github.zhztheplayer.velox4j.type.TimestampType;
import io.github.zhztheplayer.velox4j.type.Type;
@@ -80,6 +81,9 @@ class DefaultRexCallConverter extends BaseRexCallConverter {
if (sourceType instanceof TimestampType && resultType instanceof
TimestampType) {
return sourceExpr;
}
+ } else if ("unix_timestamp".equals(functionName)) {
+ // TODO: this is a trick here for q12, refine it.
+ resultType = new BigIntType();
}
return new CallTypedExpr(resultType, params, functionName);
}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
index 38e48cea9b..df7a6e8a90 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
@@ -91,7 +91,13 @@ public class RexCallConverterFactory {
() -> new
BasicArithmeticOperatorRexCallConverter("lessthanorequal"),
() -> new StringCompareRexCallConverter("lessthanorequal"),
() -> new
StringNumberCompareRexCallConverter("lessthanorequal"),
- () -> new
TimestampIntervalRexCallConverter("lessthanorequal"))));
+ () -> new
TimestampIntervalRexCallConverter("lessthanorequal"))),
+ Map.entry("PROCTIME", Arrays.asList(() -> new
DefaultRexCallConverter("unix_timestamp"))),
+ Map.entry("OR", Arrays.asList(() -> new
DefaultRexCallConverter("or"))),
+ Map.entry("IS NOT NULL", Arrays.asList(() -> new
DefaultRexCallConverter("is_not_null"))),
+ Map.entry(
+ "REGEXP_EXTRACT", Arrays.asList(() -> new
DefaultRexCallConverter("regexp_extract"))),
+ Map.entry("LOWER", Arrays.asList(() -> new
DefaultRexCallConverter("lower"))));
public static RexCallConverter getConverter(RexCall callNode,
RexConversionContext context) {
String operatorName = callNode.getOperator().getName();
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
index fe4ed41966..5319c61b79 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
@@ -39,13 +39,13 @@ import
io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark;
import io.github.zhztheplayer.velox4j.type.RowType;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +69,6 @@ public class GlutenVectorOneInputOperator extends
TableStreamOperator<StatefulRe
private Session session;
private Query query;
private ExternalStreams.BlockingQueue inputQueue;
- private BufferAllocator allocator;
private SerialTask task;
public GlutenVectorOneInputOperator(
@@ -80,14 +79,9 @@ public class GlutenVectorOneInputOperator extends
TableStreamOperator<StatefulRe
this.outputTypes = outputTypes;
}
- @Override
- public void open() throws Exception {
- super.open();
- outElement = new StreamRecord(null);
+ void initGlutenTask() {
memoryManager = MemoryManager.create(AllocationListener.NOOP);
session = Velox4j.newSession(memoryManager);
-
- inputQueue = session.externalStreamOps().newBlockingQueue();
// add a mock input as velox not allow the source is empty.
StatefulPlanNode mockInput =
new StatefulPlanNode(
@@ -101,8 +95,14 @@ public class GlutenVectorOneInputOperator extends
TableStreamOperator<StatefulRe
LOG.debug("Gluten Plan: {}", Serde.toJson(mockInput));
LOG.debug("OutTypes: {}", outputTypes.keySet());
query = new Query(mockInput, Config.empty(), ConnectorConfig.empty());
- allocator = new RootAllocator(Long.MAX_VALUE);
task = session.queryOps().execute(query);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ outElement = new StreamRecord(null);
+ inputQueue = session.externalStreamOps().newBlockingQueue();
ExternalStreamConnectorSplit split =
new ExternalStreamConnectorSplit("connector-external-stream",
inputQueue.id());
task.addSplit(id, split);
@@ -138,7 +138,6 @@ public class GlutenVectorOneInputOperator extends
TableStreamOperator<StatefulRe
task.close();
session.close();
memoryManager.close();
- allocator.close();
}
@Override
@@ -160,4 +159,41 @@ public class GlutenVectorOneInputOperator extends
TableStreamOperator<StatefulRe
public String getId() {
return id;
}
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ // TODO: notify velox
+ super.prepareSnapshotPreBarrier(checkpointId);
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ // TODO: implement it
+ task.snapshotState(0);
+ super.snapshotState(context);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ if (task == null) {
+ initGlutenTask();
+ }
+ // TODO: implement it
+ task.initializeState(0);
+ super.initializeState(context);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // TODO: notify velox
+ task.notifyCheckpointComplete(checkpointId);
+ super.notifyCheckpointComplete(checkpointId);
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ // TODO: notify velox
+ task.notifyCheckpointAborted(checkpointId);
+ super.notifyCheckpointAborted(checkpointId);
+ }
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
index 4224c0aaa6..1bbd8993ba 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
@@ -31,6 +31,11 @@ import io.github.zhztheplayer.velox4j.session.Session;
import io.github.zhztheplayer.velox4j.stateful.StatefulElement;
import io.github.zhztheplayer.velox4j.type.RowType;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -45,7 +50,8 @@ import java.util.Map;
* Gluten legacy source function, call velox plan to execute. It sends
RowVector to downstream
* instead of RowData to avoid data convert.
*/
-public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<StatefulElement> {
+public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<StatefulElement>
+ implements CheckpointedFunction, CheckpointListener {
private static final Logger LOG =
LoggerFactory.getLogger(GlutenVectorSourceFunction.class);
private final StatefulPlanNode planNode;
@@ -56,8 +62,9 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
private Session session;
private Query query;
- BufferAllocator allocator;
+ private BufferAllocator allocator;
private MemoryManager memoryManager;
+ private SerialTask task;
public GlutenVectorSourceFunction(
StatefulPlanNode planNode,
@@ -87,17 +94,24 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
return split;
}
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ System.out.println("InitializeState GlutenSourceFunction");
+ if (memoryManager == null) {
+ memoryManager = MemoryManager.create(AllocationListener.NOOP);
+ session = Velox4j.newSession(memoryManager);
+ query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
+ allocator = new RootAllocator(Long.MAX_VALUE);
+
+ task = session.queryOps().execute(query);
+ task.addSplit(id, split);
+ task.noMoreSplits(id);
+ }
+ }
+
@Override
public void run(SourceContext<StatefulElement> sourceContext) throws
Exception {
LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode));
- memoryManager = MemoryManager.create(AllocationListener.NOOP);
- session = Velox4j.newSession(memoryManager);
- query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
- allocator = new RootAllocator(Long.MAX_VALUE);
-
- SerialTask task = session.queryOps().execute(query);
- task.addSplit(id, split);
- task.noMoreSplits(id);
while (isRunning) {
UpIterator.State state = task.advance();
if (state == UpIterator.State.AVAILABLE) {
@@ -126,4 +140,39 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
public void cancel() {
isRunning = false;
}
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ // TODO: implement it
+ this.task.snapshotState(0);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+ if (memoryManager == null) {
+ memoryManager = MemoryManager.create(AllocationListener.NOOP);
+ session = Velox4j.newSession(memoryManager);
+ query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
+ allocator = new RootAllocator(Long.MAX_VALUE);
+
+ task = session.queryOps().execute(query);
+ task.addSplit(id, split);
+ task.noMoreSplits(id);
+ }
+ System.out.println("InitializeState GlutenSourceFunction");
+ // TODO: implement it
+ this.task.initializeState(0);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // TODO: notify velox
+ this.task.notifyCheckpointComplete(checkpointId);
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ // TODO: notify velox
+ this.task.notifyCheckpointAborted(checkpointId);
+ }
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
index d10fcfa05d..04ed555a3b 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
@@ -37,13 +37,13 @@ import
io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark;
import io.github.zhztheplayer.velox4j.type.RowType;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,7 +73,6 @@ public class GlutenVectorTwoInputOperator extends
AbstractStreamOperator<Statefu
private Query query;
private ExternalStreams.BlockingQueue leftInputQueue;
private ExternalStreams.BlockingQueue rightInputQueue;
- private BufferAllocator allocator;
private SerialTask task;
public GlutenVectorTwoInputOperator(
@@ -91,21 +90,26 @@ public class GlutenVectorTwoInputOperator extends
AbstractStreamOperator<Statefu
this.outputTypes = outputTypes;
}
+ // initializeState is called before open, so need to init gluten task first.
+ private void initGlutenTask() {
+ memoryManager = MemoryManager.create(AllocationListener.NOOP);
+ session = Velox4j.newSession(memoryManager);
+ query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty());
+ task = session.queryOps().execute(query);
+ LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan));
+ LOG.debug("OutTypes: {}", outputTypes.keySet());
+ LOG.debug("RuntimeContex: {}", getRuntimeContext().getClass().getName());
+ }
+
@Override
public void open() throws Exception {
super.open();
+ if (task == null) {
+ initGlutenTask();
+ }
outElement = new StreamRecord(null);
- memoryManager = MemoryManager.create(AllocationListener.NOOP);
- session = Velox4j.newSession(memoryManager);
-
leftInputQueue = session.externalStreamOps().newBlockingQueue();
rightInputQueue = session.externalStreamOps().newBlockingQueue();
- LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan));
- LOG.debug("OutTypes: {}", outputTypes.keySet());
- LOG.debug("RuntimeContex: {}", getRuntimeContext().getClass().getName());
- query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty());
- allocator = new RootAllocator(Long.MAX_VALUE);
- task = session.queryOps().execute(query);
ExternalStreamConnectorSplit leftSplit =
new ExternalStreamConnectorSplit("connector-external-stream",
leftInputQueue.id());
ExternalStreamConnectorSplit rightSplit =
@@ -172,7 +176,6 @@ public class GlutenVectorTwoInputOperator extends
AbstractStreamOperator<Statefu
task.close();
session.close();
memoryManager.close();
- allocator.close();
}
@Override
@@ -210,4 +213,41 @@ public class GlutenVectorTwoInputOperator extends
AbstractStreamOperator<Statefu
public String getRightId() {
return rightId;
}
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ // TODO: notify velox
+ super.prepareSnapshotPreBarrier(checkpointId);
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ // TODO: implement it
+ task.snapshotState(0);
+ super.snapshotState(context);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ if (task == null) {
+ initGlutenTask();
+ }
+ // TODO: implement it
+ task.initializeState(0);
+ super.initializeState(context);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // TODO: notify velox
+ task.notifyCheckpointComplete(checkpointId);
+ super.notifyCheckpointComplete(checkpointId);
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ // TODO: notify velox
+ task.notifyCheckpointAborted(checkpointId);
+ super.notifyCheckpointAborted(checkpointId);
+ }
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
index 7db368ae14..d4e33a17a1 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
@@ -27,6 +27,7 @@ import
org.apache.flink.table.types.logical.DayTimeIntervalType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
@@ -103,7 +104,11 @@ public class LogicalTypeConverter {
Type keyType = toVLType(mapType.getKeyType());
Type valueType = toVLType(mapType.getValueType());
return
io.github.zhztheplayer.velox4j.type.MapType.create(keyType, valueType);
- }));
+ }),
+ // TODO: may need precision
+ Map.entry(
+ LocalZonedTimestampType.class,
+ logicalType -> new
io.github.zhztheplayer.velox4j.type.TimestampType()));
public static Type toVLType(LogicalType logicalType) {
VLTypeConverter converter = converters.get(logicalType.getClass());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]