This is an automated email from the ASF dual-hosted git repository.

shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f3bc4f172 [GLUTEN-10361][FLINK] Fix UT failure between the conversion 
of `BinaryRowData` and `StatefulRecord` (#10362)
7f3bc4f172 is described below

commit 7f3bc4f17240cdf1a41245641d743d84c6d6e4b8
Author: kevinyhzou <[email protected]>
AuthorDate: Tue Sep 9 12:07:42 2025 +0800

    [GLUTEN-10361][FLINK] Fix UT failure between the conversion of 
`BinaryRowData` and `StatefulRecord` (#10362)
    
    * fix ut by use VectorSource and PrintSink
---
 .github/workflows/flink.yml                        |   2 +-
 gluten-flink/docs/Flink.md                         |   2 +-
 .../plan/nodes/exec/common/CommonExecSink.java     |  10 +-
 .../plan/nodes/exec/stream/StreamExecExchange.java |   2 +-
 .../exec/stream/StreamExecTableSourceScan.java     | 112 +++++++++++++++++++++
 .../org/apache/gluten/velox/VeloxSinkBuilder.java  | 112 +++++++++++++++++++++
 .../apache/gluten/velox/VeloxSourceBuilder.java    | 109 ++++++++++++++++++++
 .../GlutenKeyGroupStreamPartitioner.java           |   4 +-
 .../operators/GlutenVectorSourceFunction.java      |   2 -
 .../java/org/apache/gluten/util/ReflectUtils.java  |   6 +-
 .../gluten/vectorized/ArrowVectorAccessor.java     |   2 -
 .../stream/common/GlutenStreamingTestBase.java     |  93 +++++++++++++++--
 .../runtime/stream/custom/ScalarFunctionsTest.java |   2 +-
 .../table/runtime/stream/custom/ScanTest.java      |  46 +++++----
 14 files changed, 464 insertions(+), 40 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index c499443304..0026e6669c 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -59,7 +59,7 @@ jobs:
           source /opt/rh/gcc-toolset-11/enable
           sudo dnf install -y patchelf
           git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
-          cd velox4j && git reset --hard 
1cdeb1a8384967499919e655d55a66f2daa9d55c
+          cd velox4j && git reset --hard 
ea2ca5755ae91a8703717a85b77f9eb1620899de
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
           cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index bfff2bee9b..29b16d0a9b 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you 
have to use the follow
 ## fetch velox4j code
 git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
 cd velox4j
-git reset --hard 1cdeb1a8384967499919e655d55a66f2daa9d55c
+git reset --hard ea2ca5755ae91a8703717a85b77f9eb1620899de
 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
 ```
 **Get gluten**
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 5b7243cacb..f11d3f7958 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.common;
 import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
 import org.apache.gluten.util.LogicalTypeConverter;
 import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.velox.VeloxSinkBuilder;
 
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.dag.Transformation;
@@ -96,7 +97,6 @@ import java.util.stream.IntStream;
  */
 public abstract class CommonExecSink extends ExecNodeBase<Object>
     implements MultipleTransformationTranslator<Object> {
-
   public static final String CONSTRAINT_VALIDATOR_TRANSFORMATION = 
"constraint-validator";
   public static final String PARTITIONER_TRANSFORMATION = "partitioner";
   public static final String UPSERT_MATERIALIZE_TRANSFORMATION = 
"upsert-materialize";
@@ -466,8 +466,12 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
       } else if (runtimeProvider instanceof SinkFunctionProvider) {
         final SinkFunction<RowData> sinkFunction =
             ((SinkFunctionProvider) runtimeProvider).createSinkFunction();
-        return createSinkFunctionTransformation(
-            sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkMeta, 
sinkParallelism);
+        // --- Begin Gluten-specific code changes ---
+        Transformation sinkTransformation =
+            createSinkFunctionTransformation(
+                sinkFunction, env, inputTransform, rowtimeFieldIndex, 
sinkMeta, sinkParallelism);
+        return VeloxSinkBuilder.build(env.getConfiguration(), 
sinkTransformation);
+        // --- End Gluten-specific code changes ---
       } else if (runtimeProvider instanceof OutputFormatProvider) {
         OutputFormat<RowData> outputFormat =
             ((OutputFormatProvider) runtimeProvider).createOutputFormat();
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
index b23f4bbfa7..e6ec9162c2 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecExchange.java
@@ -17,6 +17,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
 import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import 
org.apache.gluten.streaming.runtime.partitioner.GlutenKeyGroupStreamPartitioner;
 import org.apache.gluten.table.runtime.keyselector.GlutenKeySelector;
 import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
 import org.apache.gluten.util.LogicalTypeConverter;
@@ -39,7 +40,6 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import 
org.apache.flink.streaming.runtime.partitioner.GlutenKeyGroupStreamPartitioner;
 import 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.table.api.TableException;
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
new file mode 100644
index 0000000000..07189d5f5e
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTableSourceScan.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.gluten.velox.VeloxSourceBuilder;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+
+/**
+ * Stream {@link ExecNode} to read data from an external source defined by a 
{@link
+ * ScanTableSource}.
+ */
+@ExecNodeMetadata(
+    name = "stream-exec-table-source-scan",
+    version = 1,
+    producedTransformations = CommonExecTableSourceScan.SOURCE_TRANSFORMATION,
+    minPlanVersion = FlinkVersion.v1_15,
+    minStateVersion = FlinkVersion.v1_15)
+public class StreamExecTableSourceScan extends CommonExecTableSourceScan
+    implements StreamExecNode<RowData> {
+
+  public StreamExecTableSourceScan(
+      ReadableConfig tableConfig,
+      DynamicTableSourceSpec tableSourceSpec,
+      RowType outputType,
+      String description) {
+    this(
+        ExecNodeContext.newNodeId(),
+        ExecNodeContext.newContext(StreamExecTableSourceScan.class),
+        ExecNodeContext.newPersistedConfig(StreamExecTableSourceScan.class, 
tableConfig),
+        tableSourceSpec,
+        outputType,
+        description);
+  }
+
+  @JsonCreator
+  public StreamExecTableSourceScan(
+      @JsonProperty(FIELD_NAME_ID) int id,
+      @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+      @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+      @JsonProperty(FIELD_NAME_SCAN_TABLE_SOURCE) DynamicTableSourceSpec 
tableSourceSpec,
+      @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+      @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+    super(
+        id,
+        context,
+        persistedConfig,
+        tableSourceSpec,
+        Collections.emptyList(),
+        outputType,
+        description);
+  }
+
+  @Override
+  public Transformation<RowData> createInputFormatTransformation(
+      StreamExecutionEnvironment env,
+      InputFormat<RowData, ?> inputFormat,
+      InternalTypeInfo<RowData> outputTypeInfo,
+      String operatorName) {
+    // It's better to use StreamExecutionEnvironment.createInput()
+    // rather than addLegacySource() for streaming, because it take care of 
checkpoint.
+    return env.createInput(inputFormat, 
outputTypeInfo).name(operatorName).getTransformation();
+  }
+
+  @Override
+  protected Transformation<RowData> translateToPlanInternal(
+      PlannerBase planner, ExecNodeConfig config) {
+    // --- Begin Gluten-specific code changes ---
+    final ScanTableSource tableSource =
+        getTableSourceSpec()
+            .getScanTableSource(
+                planner.getFlinkContext(), 
ShortcutUtils.unwrapTypeFactory(planner));
+    Transformation<RowData> sourceTransformation = 
super.translateToPlanInternal(planner, config);
+    return VeloxSourceBuilder.build(sourceTransformation, tableSource);
+    // --- End Gluten-specific code changes ---
+  }
+}
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java
new file mode 100644
index 0000000000..f27dec0a44
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSinkBuilder.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenOneInputOperatorFactory;
+import org.apache.gluten.table.runtime.operators.GlutenVectorOneInputOperator;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+
+import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
+import io.github.zhztheplayer.velox4j.connector.PrintTableHandle;
+import io.github.zhztheplayer.velox4j.plan.EmptyNode;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableWriteNode;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
+import org.apache.flink.table.runtime.operators.sink.SinkOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.List;
+import java.util.Map;
+
+public class VeloxSinkBuilder {
+
+  public static Transformation build(ReadableConfig config, Transformation 
transformation) {
+    if (transformation instanceof LegacySinkTransformation) {
+      SimpleOperatorFactory operatorFactory =
+          (SimpleOperatorFactory) ((LegacySinkTransformation) 
transformation).getOperatorFactory();
+      OneInputStreamOperator sinkOp = (OneInputStreamOperator) 
operatorFactory.getOperator();
+      if (sinkOp instanceof SinkOperator
+          && ((SinkOperator) sinkOp)
+              .getUserFunction()
+              .getClass()
+              .getSimpleName()
+              .equals("RowDataPrintFunction")) {
+        return buildPrintSink(config, (LegacySinkTransformation) 
transformation);
+      }
+    }
+    return transformation;
+  }
+
+  private static LegacySinkTransformation buildPrintSink(
+      ReadableConfig config, LegacySinkTransformation transformation) {
+    Transformation inputTrans = (Transformation) 
transformation.getInputs().get(0);
+    InternalTypeInfo inputTypeInfo = (InternalTypeInfo) 
inputTrans.getOutputType();
+    String logDir = config.get(CoreOptions.FLINK_LOG_DIR);
+    String printPath;
+    if (logDir != null) {
+      printPath = String.format("file://%s/%s", logDir, "taskmanager.out");
+    } else {
+      String flinkHomeDir = System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR);
+      if (flinkHomeDir == null) {
+        String flinkConfDir = 
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+        if (flinkConfDir == null) {
+          throw new FlinkRuntimeException(
+              "Can not get flink home directory, please set FLINK_HOME.");
+        }
+        printPath = String.format("file://%s/../log/%s", flinkConfDir, 
"taskmanager.out");
+      } else {
+        printPath = String.format("file://%s/log/%s", flinkHomeDir, 
"taskmanager.out");
+      }
+    }
+    RowType inputColumns = (RowType) 
LogicalTypeConverter.toVLType(inputTypeInfo.toLogicalType());
+    RowType ignore = new RowType(List.of("num"), List.of(new BigIntType()));
+    PrintTableHandle tableHandle = new PrintTableHandle("print-table", 
inputColumns, printPath);
+    TableWriteNode tableWriteNode =
+        new TableWriteNode(
+            PlanNodeIdGenerator.newId(),
+            inputColumns,
+            inputColumns.getNames(),
+            null,
+            "connector-print",
+            tableHandle,
+            false,
+            ignore,
+            CommitStrategy.NO_COMMIT,
+            List.of(new EmptyNode(inputColumns)));
+    return new LegacySinkTransformation(
+        inputTrans,
+        transformation.getName(),
+        new GlutenOneInputOperatorFactory(
+            new GlutenVectorOneInputOperator(
+                new StatefulPlanNode(tableWriteNode.getId(), tableWriteNode),
+                PlanNodeIdGenerator.newId(),
+                inputColumns,
+                Map.of(tableWriteNode.getId(), ignore))),
+        transformation.getParallelism());
+  }
+}
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java
new file mode 100644
index 0000000000..7c56bef729
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/velox/VeloxSourceBuilder.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.velox;
+
+import org.apache.gluten.streaming.api.operators.GlutenStreamSource;
+import org.apache.gluten.table.runtime.operators.GlutenVectorSourceFunction;
+import org.apache.gluten.util.LogicalTypeConverter;
+import org.apache.gluten.util.PlanNodeIdGenerator;
+import org.apache.gluten.util.ReflectUtils;
+
+import io.github.zhztheplayer.velox4j.connector.FromElementsConnectorSplit;
+import io.github.zhztheplayer.velox4j.connector.FromElementsTableHandle;
+import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+
+import org.apache.flink.api.dag.Transformation;
+import 
org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class VeloxSourceBuilder {
+
+  public static Transformation<RowData> build(
+      Transformation<RowData> transformation, ScanTableSource scanTableSource) 
{
+    if (transformation instanceof LegacySourceTransformation) {
+      if 
(scanTableSource.getClass().getSimpleName().equals("TestValuesScanLookupTableSource"))
 {
+        return buildFromElementsSource(transformation, scanTableSource);
+      }
+    }
+    return transformation;
+  }
+
+  /** `FromElementsSource` is designed for ut tests, and we map it to velox 
source. */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private static Transformation<RowData> buildFromElementsSource(
+      Transformation<RowData> transformation, ScanTableSource tableSource) {
+    LegacySourceTransformation<RowData> sourceTransformation =
+        (LegacySourceTransformation<RowData>) transformation;
+    try {
+      Class<?> tableSourceClazz =
+          Class.forName(
+              
"org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown");
+      Map<Map<String, String>, Collection<Row>> data =
+          (Map) ReflectUtils.getObjectField(tableSourceClazz, tableSource, 
"data");
+      InternalTypeInfo<RowData> typeInfo =
+          (InternalTypeInfo<RowData>) sourceTransformation.getOutputType();
+      io.github.zhztheplayer.velox4j.type.RowType rowType =
+          (io.github.zhztheplayer.velox4j.type.RowType)
+              LogicalTypeConverter.toVLType(typeInfo.toLogicalType());
+      List<String> values = new ArrayList<>();
+      for (Collection<Row> rows : data.values()) {
+        for (Row row : rows) {
+          Row projectedRow =
+              (Row)
+                  ReflectUtils.invokeObjectMethod(
+                      tableSourceClazz,
+                      tableSource,
+                      "projectRow",
+                      new Class<?>[] {Row.class},
+                      new Object[] {row});
+          values.add(projectedRow.toString());
+        }
+      }
+      FromElementsTableHandle tableHandle =
+          new FromElementsTableHandle(
+              "connector-from-elements", "from-elements-table", rowType, 
values);
+      TableScanNode scanNode =
+          new TableScanNode(PlanNodeIdGenerator.newId(), rowType, tableHandle, 
List.of());
+      GlutenStreamSource op =
+          new GlutenStreamSource(
+              new GlutenVectorSourceFunction(
+                  new StatefulPlanNode(scanNode.getId(), scanNode),
+                  Map.of(scanNode.getId(), rowType),
+                  scanNode.getId(),
+                  new FromElementsConnectorSplit("connector-from-elements", 0, 
false)));
+      return new LegacySourceTransformation<RowData>(
+          sourceTransformation.getName(),
+          op,
+          typeInfo,
+          sourceTransformation.getParallelism(),
+          sourceTransformation.getBoundedness(),
+          false);
+    } catch (Exception e) {
+      throw new FlinkRuntimeException(e);
+    }
+  }
+}
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/runtime/partitioner/GlutenKeyGroupStreamPartitioner.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/runtime/partitioner/GlutenKeyGroupStreamPartitioner.java
index 43b6964b15..1a30c525f8 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/runtime/partitioner/GlutenKeyGroupStreamPartitioner.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/streaming/runtime/partitioner/GlutenKeyGroupStreamPartitioner.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.partitioner;
+package org.apache.gluten.streaming.runtime.partitioner;
 
 import io.github.zhztheplayer.velox4j.stateful.StatefulRecord;
 
@@ -23,6 +23,8 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
index 1bbd8993ba..90ad655f0f 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
@@ -96,7 +96,6 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
 
   @Override
   public void open(Configuration parameters) throws Exception {
-    System.out.println("InitializeState GlutenSourceFunction");
     if (memoryManager == null) {
       memoryManager = MemoryManager.create(AllocationListener.NOOP);
       session = Velox4j.newSession(memoryManager);
@@ -159,7 +158,6 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
       task.addSplit(id, split);
       task.noMoreSplits(id);
     }
-    System.out.println("InitializeState GlutenSourceFunction");
     // TODO: implement it
     this.task.initializeState(0);
   }
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
index c3a34b45ec..74c0208066 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/ReflectUtils.java
@@ -16,6 +16,8 @@
  */
 package org.apache.gluten.util;
 
+import org.apache.flink.util.FlinkRuntimeException;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 
@@ -27,7 +29,7 @@ public class ReflectUtils {
       f.setAccessible(true);
       return f.get(obj);
     } catch (Exception e) {
-      throw new RuntimeException(e);
+      throw new FlinkRuntimeException(e);
     }
   }
 
@@ -38,7 +40,7 @@ public class ReflectUtils {
       m.setAccessible(true);
       return m.invoke(obj, paramValues);
     } catch (Exception e) {
-      throw new RuntimeException(e);
+      throw new FlinkRuntimeException(e);
     }
   }
 }
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
index c373c77faa..c397e6fada 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorAccessor.java
@@ -16,8 +16,6 @@
  */
 package org.apache.gluten.vectorized;
 
-import io.github.zhztheplayer.velox4j.type.*;
-
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
index 84aae47402..193d1c8ade 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/common/GlutenStreamingTestBase.java
@@ -16,21 +16,31 @@
  */
 package org.apache.gluten.table.runtime.stream.common;
 
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.junit.jupiter.api.BeforeAll;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class GlutenStreamingTestBase extends StreamingTestBase {
   private static final Logger LOG = 
LoggerFactory.getLogger(GlutenStreamingTestBase.class);
@@ -47,7 +57,7 @@ public class GlutenStreamingTestBase extends 
StreamingTestBase {
    */
   protected void createSimpleBoundedValuesTable(String tableName, String 
schema, List<Row> rows) {
     String myTableDataId = TestValuesTableFactory.registerData(rows);
-    String table =
+    String sourceTable =
         "CREATE TABLE "
             + tableName
             + "(\n"
@@ -59,7 +69,34 @@ public class GlutenStreamingTestBase extends 
StreamingTestBase {
             + String.format(" 'data-id' = '%s',\n", myTableDataId)
             + " 'nested-projection-supported' = 'true'\n"
             + ")";
-    tEnv().executeSql(table);
+    tEnv().executeSql(sourceTable);
+  }
+
+  protected void createPrintSinkTable(String tableName, ResolvedSchema schema) 
{
+    List<Column> cols = schema.getColumns();
+    StringBuilder schemaBuilder = new StringBuilder();
+    for (int i = 0; i < cols.size(); ++i) {
+      Column col = cols.get(i);
+      schemaBuilder.append(col.getName()).append(" ");
+      if (col.getDataType().getLogicalType() instanceof TimestampType) {
+        String typeName = col.getDataType().toString().replace("*ROWTIME*", 
"");
+        schemaBuilder.append(typeName);
+      } else {
+        schemaBuilder.append(col.getDataType().toString());
+      }
+      if (i != cols.size() - 1) {
+        schemaBuilder.append(",");
+      }
+    }
+    String sinkTable =
+        "CREATE TABLE "
+            + tableName
+            + "(\n"
+            + schemaBuilder.toString()
+            + "\n"
+            + ") WITH (\n"
+            + " 'connector' = 'print')";
+    tEnv().executeSql(sinkTable);
   }
 
   // Return the execution plan represented by StreamEexcNode
@@ -75,10 +112,48 @@ public class GlutenStreamingTestBase extends 
StreamingTestBase {
   }
 
   protected void runAndCheck(String query, List<String> expected) {
-    List<String> actual =
-        
CollectionUtil.iteratorToList(tEnv().executeSql(query).collect()).stream()
-            .map(Object::toString)
-            .collect(Collectors.toList());
-    assertThat(actual).isEqualTo(expected);
+    String printResultDirPath = System.getProperty("user.dir") + "/log/";
+    tEnv().getConfig().set(CoreOptions.FLINK_LOG_DIR, printResultDirPath);
+    String printResultFilePath = String.format("%s%s", printResultDirPath, 
"taskmanager.out");
+    File printResultFile = new File(printResultFilePath);
+    boolean deleteResultFile = true;
+    if (printResultFile.exists()) {
+      deleteResultFile = printResultFile.delete();
+    }
+    Table table = tEnv().sqlQuery(query);
+    createPrintSinkTable("printT", table.getResolvedSchema());
+    String newQuery = String.format("insert into %s %s", "printT", query);
+    TableResult tableResult = tEnv().executeSql(newQuery);
+    assertTrue(tableResult.getJobClient().isPresent());
+    try {
+      JobClient jobClient = tableResult.getJobClient().get();
+      if (deleteResultFile) {
+        try {
+          while (!printResultFile.exists()) {
+            Thread.sleep(10);
+          }
+          long fileSize = -1L;
+          while (printResultFile.length() > fileSize) {
+            fileSize = printResultFile.length();
+            Thread.sleep(3000);
+          }
+        } finally {
+          jobClient.cancel();
+        }
+      }
+      List<String> result = new ArrayList<>();
+      try (FileReader fr = new FileReader(printResultFile);
+          BufferedReader br = new BufferedReader(fr)) {
+        String line = null;
+        while ((line = br.readLine()) != null) {
+          result.add(line);
+        }
+      }
+      assertThat(result).isEqualTo(expected);
+    } catch (Exception e) {
+      throw new FlinkRuntimeException(e);
+    } finally {
+      tEnv().executeSql("drop table if exists printT");
+    }
   }
 }
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
index ccd9b044bd..2b544bb6f0 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
@@ -28,7 +28,6 @@ import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.List;
 
-@Disabled("Need to apply for the new interface of gluten operator")
 class ScalarFunctionsTest extends GlutenStreamingTestBase {
 
   @Override
@@ -130,6 +129,7 @@ class ScalarFunctionsTest extends GlutenStreamingTestBase {
     runAndCheck(query4, Arrays.asList("+I[false]", "+I[true]", "+I[false]"));
   }
 
+  @Disabled
   @Test
   void testReinterpret() {
     List<Row> rows =
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
index ea099ed58e..7d097763d6 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScanTest.java
@@ -21,7 +21,6 @@ import 
org.apache.gluten.table.runtime.stream.common.GlutenStreamingTestBase;
 import org.apache.flink.types.Row;
 
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,7 +32,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-@Disabled("Need to apply for the new interface of gluten operator")
 class ScanTest extends GlutenStreamingTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(ScanTest.class);
 
@@ -54,7 +52,6 @@ class ScanTest extends GlutenStreamingTestBase {
   }
 
   @Test
-  @Disabled("The output is not as expected.")
   void testStructScan() {
     List<Row> rows =
         Arrays.asList(
@@ -65,8 +62,9 @@ class ScanTest extends GlutenStreamingTestBase {
     String query1 = "select a, b.x, b.y from strctTbl where a > 0";
     runAndCheck(query1, Arrays.asList("+I[1, 2, abc]", "+I[2, 6, def]", "+I[3, 
8, ghi]"));
 
-    String query2 = "select a, b from strctTbl where a > 1";
-    runAndCheck(query2, Arrays.asList("+I[2, +I[6, def]]", "+I[3, +I[8, 
ghi]]"));
+    /// TODO: The query2's output is not as expected.
+    // String query2 = "select a, b from strctTbl where a > 1";
+    // runAndCheck(query2, Arrays.asList("+I[2, +I[6, def]]", "+I[3, +I[8, 
ghi]]"));
   }
 
   @Test
@@ -113,13 +111,21 @@ class ScanTest extends GlutenStreamingTestBase {
     runAndCheck(query, Arrays.asList("+I[1, [[1, 3]]]", "+I[3, [[4, 5]]]"));
   }
 
+  private static <K, V> Map<K, V> orderedMap(K[] keys, V[] values) {
+    Map<K, V> map = new LinkedHashMap<>();
+    for (int i = 0; i < keys.length; ++i) {
+      map.put(keys[i], values[i]);
+    }
+    return map;
+  }
+
   @Test
   void testMapScan() {
     List<Row> rows =
         Arrays.asList(
-            Row.of(1, Map.of(1, "a")),
-            Row.of(2, Map.of(2, "b", 3, "c")),
-            Row.of(3, Map.of(4, "d", 5, "e", 6, "f")));
+            Row.of(1, orderedMap(new Integer[] {1}, new String[] {"a"})),
+            Row.of(2, orderedMap(new Integer[] {2, 3}, new String[] {"b", 
"c"})),
+            Row.of(3, orderedMap(new Integer[] {4, 5, 6}, new String[] {"d", 
"e", "f"})));
     createSimpleBoundedValuesTable("mapTbl1", "a int, b map<int, string>", 
rows);
     String query = "select a, b from mapTbl1 where a > 0";
     runAndCheck(
@@ -127,9 +133,15 @@ class ScanTest extends GlutenStreamingTestBase {
 
     rows =
         Arrays.asList(
-            Row.of(1, new Map[] {Map.of("a", 1), Map.of("b", 2)}),
-            Row.of(2, new Map[] {Map.of("b", 2, "c", 3)}),
-            Row.of(3, new Map[] {Map.of("d", 4, "e", 5, "f", 6)}));
+            Row.of(
+                1,
+                new Map[] {
+                  orderedMap(new String[] {"a"}, new Integer[] {1}),
+                  orderedMap(new String[] {"b"}, new Integer[] {2})
+                }),
+            Row.of(2, new Map[] {orderedMap(new String[] {"b", "c"}, new 
Integer[] {2, 3})}),
+            Row.of(
+                3, new Map[] {orderedMap(new String[] {"d", "e", "f"}, new 
Integer[] {4, 5, 6})}));
     createSimpleBoundedValuesTable("mapTbl2", "a int, b array<map<string, 
int>>", rows);
     query = "select a, b from mapTbl2 where a > 0";
     runAndCheck(
@@ -164,15 +176,15 @@ class ScanTest extends GlutenStreamingTestBase {
     query = "select a, b from nullArrayTbl where a > 0";
     runAndCheck(query, Arrays.asList("+I[1, null]", "+I[2, [null, 5, 6]]", 
"+I[3, [7, 8, null]]"));
 
-    Map<String, Integer> mapNullVal = new LinkedHashMap();
-    mapNullVal.put("a", null);
-    mapNullVal.put("b", 2);
-    mapNullVal.put(null, 3);
-    rows = Arrays.asList(Row.of(1, null), Row.of(2, mapNullVal), Row.of(3, 
Map.of("c", 3, "d", 4)));
+    rows =
+        Arrays.asList(
+            Row.of(1, null),
+            Row.of(2, orderedMap(new String[] {"a", "b", null}, new Integer[] 
{null, 2, 3})),
+            Row.of(3, orderedMap(new String[] {"c", "d"}, new Integer[] {3, 
4})));
     createSimpleBoundedValuesTable("nullMapTbl", "a int, b map<string, int>", 
rows);
     query = "select a, b from nullMapTbl where a > 0";
     runAndCheck(
-        query, Arrays.asList("+I[1, null]", "+I[2, {null=3, a=null, b=2}]", 
"+I[3, {c=3, d=4}]"));
+        query, Arrays.asList("+I[1, null]", "+I[2, {a=null, b=2, null=3}]", 
"+I[3, {c=3, d=4}]"));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to