This is an automated email from the ASF dual-hosted git repository.
shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7f3bc4f172 [GLUTEN-10361][FLINK] Fix UT failure between the conversion
of `BinaryRowData` and `StatefulRecord` (#10362)
7f3bc4f172 is described below
commit 7f3bc4f17240cdf1a41245641d743d84c6d6e4b8
Author: kevinyhzou <[email protected]>
AuthorDate: Tue Sep 9 12:07:42 2025 +0800
[GLUTEN-10361][FLINK] Fix UT failure between the conversion of
`BinaryRowData` and `StatefulRecord` (#10362)
* fix ut by use VectorSource and PrintSink
---
.github/workflows/flink.yml | 2 +-
gluten-flink/docs/Flink.md | 2 +-
.../plan/nodes/exec/common/CommonExecSink.java | 10 +-
.../plan/nodes/exec/stream/StreamExecExchange.java | 2 +-
.../exec/stream/StreamExecTableSourceScan.java | 112 +++++++++++++++++++++
.../org/apache/gluten/velox/VeloxSinkBuilder.java | 112 +++++++++++++++++++++
.../apache/gluten/velox/VeloxSourceBuilder.java | 109 ++++++++++++++++++++
.../GlutenKeyGroupStreamPartitioner.java | 4 +-
.../operators/GlutenVectorSourceFunction.java | 2 -
.../java/org/apache/gluten/util/ReflectUtils.java | 6 +-
.../gluten/vectorized/ArrowVectorAccessor.java | 2 -
.../stream/common/GlutenStreamingTestBase.java | 93 +++++++++++++++--
.../runtime/stream/custom/ScalarFunctionsTest.java | 2 +-
.../table/runtime/stream/custom/ScanTest.java | 46 +++++----
14 files changed, 464 insertions(+), 40 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index c499443304..0026e6669c 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -59,7 +59,7 @@ jobs:
source /opt/rh/gcc-toolset-11/enable
sudo dnf install -y patchelf
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
- cd velox4j && git reset --hard
1cdeb1a8384967499919e655d55a66f2daa9d55c
+ cd velox4j && git reset --hard
ea2ca5755ae91a8703717a85b77f9eb1620899de
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index bfff2bee9b..29b16d0a9b 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you
have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
-git reset --hard 1cdeb1a8384967499919e655d55a66f2daa9d55c
+git reset --hard ea2ca5755ae91a8703717a85b77f9eb1620899de
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
```
**Get gluten**
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 5b7243cacb..f11d3f7958 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.common;
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.velox.VeloxSinkBuilder;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.dag.Transformation;
@@ -96,7 +97,6 @@ import java.util.stream.IntStream;
*/
public abstract class CommonExecSink extends ExecNodeBase<Object>
implements MultipleTransformationTranslator<Object> {
-
public static final String CONSTRAINT_VALIDATOR_TRANSFORMATION =
"constraint-validator";
public static final String PARTITIONER_TRANSFORMATION = "partitioner";
public static final String UPSERT_MATERIALIZE_TRANSFORMATION =
"upsert-materialize";
@@ -466,8 +466,12 @@ public abstract class CommonExecSink extends
ExecNodeBase<Object>
} else if (runtimeProvider instanceof SinkFunctionProvider) {
final SinkFunction<RowData> sinkFunction =
((SinkFunctionProvider) runtimeProvider).createSinkFunction();
- return createSinkFunctionTransformation(
- sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkMeta,
sinkParallelism);
+ // --- Begin Gluten-specific code changes ---
+ Transformation sinkTransformation =
+ createSinkFunctionTransformation(
+ sinkFunction, env, inputTransform, rowtimeFieldIndex,
sinkMeta, sinkParallelism);
+ return VeloxSinkBuilder.build(env.getConfiguration(),
sinkTransformation);
+ // --- End Gluten-specific code changes ---
} else if (runtimeProvider instanceof OutputFormatProvider) {
OutputFormat<RowData> outputFormat =
((OutputFormatProvider) runtimeProvider).createOutputFormat();
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
index b23f4bbfa7..e6ec9162c2 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
@@ -17,6 +17,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import
org.apache.gluten.streaming.runtime.partitioner.GlutenKeyGroupStreamPartitioner;
import org.apache.gluten.table.runtime.keyselector.GlutenKeySelector;
import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
@@ -39,7 +40,6 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import
org.apache.flink.streaming.runtime.partitioner.GlutenKeyGroupStreamPartitioner;
import
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.api.TableException;
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
new file mode 100644
index 0000000000..07189d5f5e
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.gluten.velox.VeloxSourceBuilder;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
+import
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+
+/**
+ * Stream {@link ExecNode} to read data from an external source defined by a
{@link
+ * ScanTableSource}.
+ */
+@ExecNodeMetadata(
+ name = "stream-exec-table-source-scan",
+ version = 1,
+ producedTransformations = CommonExecTableSourceScan.SOURCE_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v1_15,
+ minStateVersion = FlinkVersion.v1_15)
+public class StreamExecTableSourceScan extends CommonExecTableSourceScan
+ implements StreamExecNode<RowData> {
+
+ public StreamExecTableSourceScan(
+ ReadableConfig tableConfig,
+ DynamicTableSourceSpec tableSourceSpec,
+ RowType outputType,
+ String description) {
+ this(
+ ExecNodeContext.newNodeId(),
+ ExecNodeContext.newContext(StreamExecTableSourceScan.class),
+ ExecNodeContext.newPersistedConfig(StreamExecTableSourceScan.class,
tableConfig),
+ tableSourceSpec,
+ outputType,
+ description);
+ }
+
+ @JsonCreator
+ public StreamExecTableSourceScan(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+ @JsonProperty(FIELD_NAME_SCAN_TABLE_SOURCE) DynamicTableSourceSpec
tableSourceSpec,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(
+ id,
+ context,
+ persistedConfig,
+ tableSourceSpec,
+ Collections.emptyList(),
+ outputType,
+ description);
+ }
+
+ @Override
+ public Transformation<RowData> createInputFormatTransformation(
+ StreamExecutionEnvironment env,
+ InputFormat<RowData, ?> inputFormat,
+ InternalTypeInfo<RowData> outputTypeInfo,
+ String operatorName) {
+ // It's better to use StreamExecutionEnvironment.createInput()
+ // rather than addLegacySource() for streaming, because it take care of
checkpoint.
+ return env.createInput(inputFormat,
outputTypeInfo).name(operatorName).getTransformation();
+ }
+
+ @Override
+ protected Transformation<RowData> translateToPlanInternal(
+ PlannerBase planner, ExecNodeConfig config) {
+ // --- Begin Gluten-specific code changes ---
+ final ScanTableSource tableSource =
+ getTableSourceSpec()
+ .getScanTableSource(
+ planner.getFlinkContext(),
ShortcutUtils.unwrapTypeFactory(planner));
+ Transformation<RowData> sourceTransformation =
super.translateToPlanInternal(planner, config);
+ return VeloxSourceBuilder.build(sourceTransformation, tableSource);
+ // --- End Gluten-specific code changes ---
+ }
+}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java
new file mode 100644
index 0000000000..f27dec0a44
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenOneInputOperatorFactory;
+import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+
+import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
+import io.github.zhztheplayer.velox4j.connector.PrintTableHandle;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableWriteNode;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.table.runtime.operators.sink.SinkOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.List;
+import java.util.Map;
+
+public class VeloxSinkBuilder {
+
+ public static Transformation build(ReadableConfig config, Transformation
transformation) {
+ if (transformation instanceof LegacySinkTransformation) {
+ SimpleOperatorFactory operatorFactory =
+ (SimpleOperatorFactory) ((LegacySinkTransformation)
transformation).getOperatorFactory();
+ OneInputStreamOperator sinkOp = (OneInputStreamOperator)
operatorFactory.getOperator();
+ if (sinkOp instanceof SinkOperator
+ && ((SinkOperator) sinkOp)
+ .getUserFunction()
+ .getClass()
+ .getSimpleName()
+ .equals("RowDataPrintFunction")) {
+ return buildPrintSink(config, (LegacySinkTransformation)
transformation);
+ }
+ }
+ return transformation;
+ }
+
+ private static LegacySinkTransformation buildPrintSink(
+ ReadableConfig config, LegacySinkTransformation transformation) {
+ Transformation inputTrans = (Transformation)
transformation.getInputs().get(0);
+ InternalTypeInfo inputTypeInfo = (InternalTypeInfo)
inputTrans.getOutputType();
+ String logDir = config.get(CoreOptions.FLINK_LOG_DIR);
+ String printPath;
+ if (logDir != null) {
+ printPath = String.format("file://%s/%s", logDir, "taskmanager.out");
+ } else {
+ String flinkHomeDir = System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR);
+ if (flinkHomeDir == null) {
+ String flinkConfDir =
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+ if (flinkConfDir == null) {
+ throw new FlinkRuntimeException(
+ "Can not get flink home directory, please set FLINK_HOME.");
+ }
+ printPath = String.format("file://%s/../log/%s", flinkConfDir,
"taskmanager.out");
+ } else {
+ printPath = String.format("file://%s/log/%s", flinkHomeDir,
"taskmanager.out");
+ }
+ }
+ RowType inputColumns = (RowType)
LogicalTypeConverter.toVLType(inputTypeInfo.toLogicalType());
+ RowType ignore = new RowType(List.of("num"), List.of(new BigIntType()));
+ PrintTableHandle tableHandle = new PrintTableHandle("print-table",
inputColumns, printPath);
+ TableWriteNode tableWriteNode =
+ new TableWriteNode(
+ PlanNodeIdGenerator.newId(),
+ inputColumns,
+ inputColumns.getNames(),
+ null,
+ "connector-print",
+ tableHandle,
+ false,
+ ignore,
+ CommitStrategy.NO_COMMIT,
+ List.of(new EmptyNode(inputColumns)));
+ return new LegacySinkTransformation(
+ inputTrans,
+ transformation.getName(),
+ new GlutenOneInputOperatorFactory(
+ new GlutenVectorOneInputOperator(
+ new StatefulPlanNode(tableWriteNode.getId(), tableWriteNode),
+ PlanNodeIdGenerator.newId(),
+ inputColumns,
+ Map.of(tableWriteNode.getId(), ignore))),
+ transformation.getParallelism());
+ }
+}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java
new file mode 100644
index 0000000000..7c56bef729
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.connector.FromElementsConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.FromElementsTableHandle;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+
+import org.apache.flink.api.dag.Transformation;
+import
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class VeloxSourceBuilder {
+
+ public static Transformation<RowData> build(
+ Transformation<RowData> transformation, ScanTableSource scanTableSource)
{
+ if (transformation instanceof LegacySourceTransformation) {
+ if
(scanTableSource.getClass().getSimpleName().equals("TestValuesScanLookupTableSource"))
{
+ return buildFromElementsSource(transformation, scanTableSource);
+ }
+ }
+ return transformation;
+ }
+
+ /** `FromElementsSource` is designed for ut tests, and we map it to velox
source. */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private static Transformation<RowData> buildFromElementsSource(
+ Transformation<RowData> transformation, ScanTableSource tableSource) {
+ LegacySourceTransformation<RowData> sourceTransformation =
+ (LegacySourceTransformation<RowData>) transformation;
+ try {
+ Class<?> tableSourceClazz =
+ Class.forName(
+
"org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown");
+ Map<Map<String, String>, Collection<Row>> data =
+ (Map) ReflectUtils.getObjectField(tableSourceClazz, tableSource,
"data");
+ InternalTypeInfo<RowData> typeInfo =
+ (InternalTypeInfo<RowData>) sourceTransformation.getOutputType();
+ io.github.zhztheplayer.velox4j.type.RowType rowType =
+ (io.github.zhztheplayer.velox4j.type.RowType)
+ LogicalTypeConverter.toVLType(typeInfo.toLogicalType());
+ List<String> values = new ArrayList<>();
+ for (Collection<Row> rows : data.values()) {
+ for (Row row : rows) {
+ Row projectedRow =
+ (Row)
+ ReflectUtils.invokeObjectMethod(
+ tableSourceClazz,
+ tableSource,
+ "projectRow",
+ new Class<?>[] {Row.class},
+ new Object[] {row});
+ values.add(projectedRow.toString());
+ }
+ }
+ FromElementsTableHandle tableHandle =
+ new FromElementsTableHandle(
+ "connector-from-elements", "from-elements-table", rowType,
values);
+ TableScanNode scanNode =
+ new TableScanNode(PlanNodeIdGenerator.newId(), rowType, tableHandle,
List.of());
+ GlutenStreamSource op =
+ new GlutenStreamSource(
+ new GlutenVectorSourceFunction(
+ new StatefulPlanNode(scanNode.getId(), scanNode),
+ Map.of(scanNode.getId(), rowType),
+ scanNode.getId(),
+ new FromElementsConnectorSplit("connector-from-elements", 0,
false)));
+ return new LegacySourceTransformation<RowData>(
+ sourceTransformation.getName(),
+ op,
+ typeInfo,
+ sourceTransformation.getParallelism(),
+ sourceTransformation.getBoundedness(),
+ false);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/runtime/partitioner/GlutenKeyGroupStreamPartitioner.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/runtime/partitioner/GlutenKeyGroupStreamPartitioner.java
index 43b6964b15..1a30c525f8 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/runtime/partitioner/GlutenKeyGroupStreamPartitioner.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/runtime/partitioner/GlutenKeyGroupStreamPartitioner.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.partitioner;
+package org.apache.gluten.streaming.runtime.partitioner;
import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
@@ -23,6 +23,8 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
index 1bbd8993ba..90ad655f0f 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
@@ -96,7 +96,6 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
@Override
public void open(Configuration parameters) throws Exception {
- System.out.println("InitializeState GlutenSourceFunction");
if (memoryManager == null) {
memoryManager = MemoryManager.create(AllocationListener.NOOP);
session = Velox4j.newSession(memoryManager);
@@ -159,7 +158,6 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
task.addSplit(id, split);
task.noMoreSplits(id);
}
- System.out.println("InitializeState GlutenSourceFunction");
// TODO: implement it
this.task.initializeState(0);
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
index c3a34b45ec..74c0208066 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
@@ -16,6 +16,8 @@
*/
package org.apache.gluten.util;
+import org.apache.flink.util.FlinkRuntimeException;
+
import java.lang.reflect.Field;
import java.lang.reflect.Method;
@@ -27,7 +29,7 @@ public class ReflectUtils {
f.setAccessible(true);
return f.get(obj);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new FlinkRuntimeException(e);
}
}
@@ -38,7 +40,7 @@ public class ReflectUtils {
m.setAccessible(true);
return m.invoke(obj, paramValues);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new FlinkRuntimeException(e);
}
}
}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
index c373c77faa..c397e6fada 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
@@ -16,8 +16,6 @@
*/
package org.apache.gluten.vectorized;
-import io.github.zhztheplayer.velox4j.type.*;
-
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
index 84aae47402..193d1c8ade 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
@@ -16,21 +16,31 @@
*/
package org.apache.gluten.table.runtime.stream.common;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.types.Row;
-import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FlinkRuntimeException;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class GlutenStreamingTestBase extends StreamingTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(GlutenStreamingTestBase.class);
@@ -47,7 +57,7 @@ public class GlutenStreamingTestBase extends
StreamingTestBase {
*/
protected void createSimpleBoundedValuesTable(String tableName, String
schema, List<Row> rows) {
String myTableDataId = TestValuesTableFactory.registerData(rows);
- String table =
+ String sourceTable =
"CREATE TABLE "
+ tableName
+ "(\n"
@@ -59,7 +69,34 @@ public class GlutenStreamingTestBase extends
StreamingTestBase {
+ String.format(" 'data-id' = '%s',\n", myTableDataId)
+ " 'nested-projection-supported' = 'true'\n"
+ ")";
- tEnv().executeSql(table);
+ tEnv().executeSql(sourceTable);
+ }
+
+ protected void createPrintSinkTable(String tableName, ResolvedSchema schema)
{
+ List<Column> cols = schema.getColumns();
+ StringBuilder schemaBuilder = new StringBuilder();
+ for (int i = 0; i < cols.size(); ++i) {
+ Column col = cols.get(i);
+ schemaBuilder.append(col.getName()).append(" ");
+ if (col.getDataType().getLogicalType() instanceof TimestampType) {
+ String typeName = col.getDataType().toString().replace("*ROWTIME*",
"");
+ schemaBuilder.append(typeName);
+ } else {
+ schemaBuilder.append(col.getDataType().toString());
+ }
+ if (i != cols.size() - 1) {
+ schemaBuilder.append(",");
+ }
+ }
+ String sinkTable =
+ "CREATE TABLE "
+ + tableName
+ + "(\n"
+ + schemaBuilder.toString()
+ + "\n"
+ + ") WITH (\n"
+ + " 'connector' = 'print')";
+ tEnv().executeSql(sinkTable);
}
// Return the execution plan represented by StreamEexcNode
@@ -75,10 +112,48 @@ public class GlutenStreamingTestBase extends
StreamingTestBase {
}
protected void runAndCheck(String query, List<String> expected) {
- List<String> actual =
-
CollectionUtil.iteratorToList(tEnv().executeSql(query).collect()).stream()
- .map(Object::toString)
- .collect(Collectors.toList());
- assertThat(actual).isEqualTo(expected);
+ String printResultDirPath = System.getProperty("user.dir") + "/log/";
+ tEnv().getConfig().set(CoreOptions.FLINK_LOG_DIR, printResultDirPath);
+ String printResultFilePath = String.format("%s%s", printResultDirPath,
"taskmanager.out");
+ File printResultFile = new File(printResultFilePath);
+ boolean deleteResultFile = true;
+ if (printResultFile.exists()) {
+ deleteResultFile = printResultFile.delete();
+ }
+ Table table = tEnv().sqlQuery(query);
+ createPrintSinkTable("printT", table.getResolvedSchema());
+ String newQuery = String.format("insert into %s %s", "printT", query);
+ TableResult tableResult = tEnv().executeSql(newQuery);
+ assertTrue(tableResult.getJobClient().isPresent());
+ try {
+ JobClient jobClient = tableResult.getJobClient().get();
+ if (deleteResultFile) {
+ try {
+ while (!printResultFile.exists()) {
+ Thread.sleep(10);
+ }
+ long fileSize = -1L;
+ while (printResultFile.length() > fileSize) {
+ fileSize = printResultFile.length();
+ Thread.sleep(3000);
+ }
+ } finally {
+ jobClient.cancel();
+ }
+ }
+ List<String> result = new ArrayList<>();
+ try (FileReader fr = new FileReader(printResultFile);
+ BufferedReader br = new BufferedReader(fr)) {
+ String line = null;
+ while ((line = br.readLine()) != null) {
+ result.add(line);
+ }
+ }
+ assertThat(result).isEqualTo(expected);
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ } finally {
+ tEnv().executeSql("drop table if exists printT");
+ }
}
}
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
index ccd9b044bd..2b544bb6f0 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
@@ -28,7 +28,6 @@ import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
-@Disabled("Need to apply for the new interface of gluten operator")
class ScalarFunctionsTest extends GlutenStreamingTestBase {
@Override
@@ -130,6 +129,7 @@ class ScalarFunctionsTest extends GlutenStreamingTestBase {
runAndCheck(query4, Arrays.asList("+I[false]", "+I[true]", "+I[false]"));
}
+ @Disabled
@Test
void testReinterpret() {
List<Row> rows =
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
index ea099ed58e..7d097763d6 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
@@ -21,7 +21,6 @@ import
org.apache.gluten.table.runtime.stream.common.GlutenStreamingTestBase;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +32,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-@Disabled("Need to apply for the new interface of gluten operator")
class ScanTest extends GlutenStreamingTestBase {
private static final Logger LOG = LoggerFactory.getLogger(ScanTest.class);
@@ -54,7 +52,6 @@ class ScanTest extends GlutenStreamingTestBase {
}
@Test
- @Disabled("The output is not as expected.")
void testStructScan() {
List<Row> rows =
Arrays.asList(
@@ -65,8 +62,9 @@ class ScanTest extends GlutenStreamingTestBase {
String query1 = "select a, b.x, b.y from strctTbl where a > 0";
runAndCheck(query1, Arrays.asList("+I[1, 2, abc]", "+I[2, 6, def]", "+I[3,
8, ghi]"));
- String query2 = "select a, b from strctTbl where a > 1";
- runAndCheck(query2, Arrays.asList("+I[2, +I[6, def]]", "+I[3, +I[8,
ghi]]"));
+ /// TODO: The query2's output is not as expected.
+ // String query2 = "select a, b from strctTbl where a > 1";
+ // runAndCheck(query2, Arrays.asList("+I[2, +I[6, def]]", "+I[3, +I[8,
ghi]]"));
}
@Test
@@ -113,13 +111,21 @@ class ScanTest extends GlutenStreamingTestBase {
runAndCheck(query, Arrays.asList("+I[1, [[1, 3]]]", "+I[3, [[4, 5]]]"));
}
+ private static <K, V> Map<K, V> orderedMap(K[] keys, V[] values) {
+ Map<K, V> map = new LinkedHashMap<>();
+ for (int i = 0; i < keys.length; ++i) {
+ map.put(keys[i], values[i]);
+ }
+ return map;
+ }
+
@Test
void testMapScan() {
List<Row> rows =
Arrays.asList(
- Row.of(1, Map.of(1, "a")),
- Row.of(2, Map.of(2, "b", 3, "c")),
- Row.of(3, Map.of(4, "d", 5, "e", 6, "f")));
+ Row.of(1, orderedMap(new Integer[] {1}, new String[] {"a"})),
+ Row.of(2, orderedMap(new Integer[] {2, 3}, new String[] {"b",
"c"})),
+ Row.of(3, orderedMap(new Integer[] {4, 5, 6}, new String[] {"d",
"e", "f"})));
createSimpleBoundedValuesTable("mapTbl1", "a int, b map<int, string>",
rows);
String query = "select a, b from mapTbl1 where a > 0";
runAndCheck(
@@ -127,9 +133,15 @@ class ScanTest extends GlutenStreamingTestBase {
rows =
Arrays.asList(
- Row.of(1, new Map[] {Map.of("a", 1), Map.of("b", 2)}),
- Row.of(2, new Map[] {Map.of("b", 2, "c", 3)}),
- Row.of(3, new Map[] {Map.of("d", 4, "e", 5, "f", 6)}));
+ Row.of(
+ 1,
+ new Map[] {
+ orderedMap(new String[] {"a"}, new Integer[] {1}),
+ orderedMap(new String[] {"b"}, new Integer[] {2})
+ }),
+ Row.of(2, new Map[] {orderedMap(new String[] {"b", "c"}, new
Integer[] {2, 3})}),
+ Row.of(
+ 3, new Map[] {orderedMap(new String[] {"d", "e", "f"}, new
Integer[] {4, 5, 6})}));
createSimpleBoundedValuesTable("mapTbl2", "a int, b array<map<string,
int>>", rows);
query = "select a, b from mapTbl2 where a > 0";
runAndCheck(
@@ -164,15 +176,15 @@ class ScanTest extends GlutenStreamingTestBase {
query = "select a, b from nullArrayTbl where a > 0";
runAndCheck(query, Arrays.asList("+I[1, null]", "+I[2, [null, 5, 6]]",
"+I[3, [7, 8, null]]"));
- Map<String, Integer> mapNullVal = new LinkedHashMap();
- mapNullVal.put("a", null);
- mapNullVal.put("b", 2);
- mapNullVal.put(null, 3);
- rows = Arrays.asList(Row.of(1, null), Row.of(2, mapNullVal), Row.of(3,
Map.of("c", 3, "d", 4)));
+ rows =
+ Arrays.asList(
+ Row.of(1, null),
+ Row.of(2, orderedMap(new String[] {"a", "b", null}, new Integer[]
{null, 2, 3})),
+ Row.of(3, orderedMap(new String[] {"c", "d"}, new Integer[] {3,
4})));
createSimpleBoundedValuesTable("nullMapTbl", "a int, b map<string, int>",
rows);
query = "select a, b from nullMapTbl where a > 0";
runAndCheck(
- query, Arrays.asList("+I[1, null]", "+I[2, {null=3, a=null, b=2}]",
"+I[3, {c=3, d=4}]"));
+ query, Arrays.asList("+I[1, null]", "+I[2, {a=null, b=2, null=3}]",
"+I[3, {c=3, d=4}]"));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]