This is an automated email from the ASF dual-hosted git repository.

pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c6a61b45 [INLONG-6784][Sort] Support apache hudi LoadNode (#6789)
4c6a61b45 is described below

commit 4c6a61b45b29aa49658a86d8799600538f86156e
Author: averyzhang <dearzhangzuof...@gmail.com>
AuthorDate: Fri Dec 9 19:52:54 2022 +0800

    [INLONG-6784][Sort] Support apache hudi LoadNode (#6789)
---
 .../apache/inlong/sort/protocol/node/LoadNode.java |   5 +-
 .../org/apache/inlong/sort/protocol/node/Node.java |   3 +
 .../sort/protocol/node/load/HudiLoadNode.java      | 174 +++++++++++++++++++
 .../sort/protocol/node/load/HudiLoadNodeTest.java  |  57 +++++++
 .../inlong/sort/parser/HudiNodeSqlParserTest.java  | 186 +++++++++++++++++++++
 5 files changed, 424 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 72cc42931..da8859e8c 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -36,6 +36,7 @@ import 
org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
 import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
@@ -71,7 +72,9 @@ import java.util.Map;
         @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = 
"greenplumLoad"),
         @JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name = 
"dlcIcebergLoad"),
         @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
-        @JsonSubTypes.Type(value = StarRocksLoadNode.class, name = 
"starRocksLoad")
+        @JsonSubTypes.Type(value = StarRocksLoadNode.class, name = 
"starRocksLoad"),
+        @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
+        @JsonSubTypes.Type(value = HudiLoadNode.class, name = "hudiLoad"),
 })
 @NoArgsConstructor
 @Data
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 7f37075e0..087a5a9a7 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -41,6 +41,7 @@ import 
org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
 import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
@@ -90,6 +91,8 @@ import java.util.TreeMap;
         @JsonSubTypes.Type(value = GreenplumLoadNode.class, name = 
"greenplumLoad"),
         @JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name = 
"dlcIcebergLoad"),
         @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
+        @JsonSubTypes.Type(value = HudiLoadNode.class, name = "hudiLoad"),
+        @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
         @JsonSubTypes.Type(value = StarRocksLoadNode.class, name = 
"starRocksLoad"),
 })
 public interface Node {
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
new file mode 100644
index 000000000..86306dea0
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
@@ -0,0 +1,174 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.InlongMetric;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+@JsonTypeName("hudiLoad")
+@Data
+@NoArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+public class HudiLoadNode extends LoadNode implements InlongMetric, 
Serializable {
+
+    private static final long serialVersionUID = -1L;
+
+    private static final String HUDI_OPTION_HIVE_SYNC_ENABLED = 
"hive_sync.enabled";
+    private static final String HUDI_OPTION_HIVE_SYNC_DB = "hive_sync.db";
+    private static final String HUDI_OPTION_HIVE_SYNC_TABLE = 
"hive_sync.table";
+    private static final String HUDI_OPTION_HIVE_SYNC_FILE_FORMAT = 
"hive_sync.file_format";
+    private static final String HUDI_OPTION_HIVE_SYNC_MODE = "hive_sync.mode";
+    private static final String HUDI_OPTION_HIVE_SYNC_MODE_HMS_VALUE = "hms";
+    private static final String HUDI_OPTION_HIVE_SYNC_METASTORE_URIS = 
"hive_sync.metastore.uris";
+    private static final String HUDI_OPTION_DEFAULT_PATH = "path";
+    private static final String HUDI_OPTION_DATABASE_NAME = 
"hoodie.database.name";
+    private static final String HUDI_OPTION_TABLE_NAME = "hoodie.table.name";
+    private static final String HUDI_OPTION_RECORDKEY_FIELD_NAME = 
"hoodie.datasource.write.recordkey.field";
+    private static final String HUDI_OPTION_PARTITIONPATH_FIELD_NAME = 
"hoodie.datasource.write.partitionpath.field";
+    private static final String DDL_ATTRIBUTE_HUDI = "hudi.";
+
+    @JsonProperty("tableName")
+    @Nonnull
+    private String tableName;
+
+    @JsonProperty("dbName")
+    @Nonnull
+    private String dbName;
+
+    @JsonProperty("primaryKey")
+    private String primaryKey;
+
+    @JsonProperty("catalogType")
+    private CatalogType catalogType;
+
+    @JsonProperty("uri")
+    private String uri;
+
+    @JsonProperty("warehouse")
+    private String warehouse;
+
+    @JsonProperty("extList")
+    private List<HashMap<String, String>> extList;
+
+    @JsonProperty("partitionFields")
+    private List<FieldInfo> partitionFields;
+
+    @JsonCreator
+    public HudiLoadNode(
+            @JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+            @JsonProperty("filters") List<FilterFunction> filters,
+            @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+            @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+            @JsonProperty("properties") Map<String, String> properties,
+            @Nonnull @JsonProperty("dbName") String dbName,
+            @Nonnull @JsonProperty("tableName") String tableName,
+            @JsonProperty("primaryKey") String primaryKey,
+            @JsonProperty("catalogType") CatalogType catalogType,
+            @JsonProperty("uri") String uri,
+            @JsonProperty("warehouse") String warehouse,
+            @JsonProperty("extList") List<HashMap<String, String>> extList,
+            @JsonProperty("partitionFields") List<FieldInfo> partitionFields) {
+        super(id, name, fields, fieldRelations, filters, filterStrategy, 
sinkParallelism, properties);
+        this.tableName = Preconditions.checkNotNull(tableName, "table name is 
null");
+        this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
+        this.primaryKey = primaryKey;
+        this.catalogType = catalogType == null ? CatalogType.HIVE : 
catalogType;
+        this.uri = uri;
+        this.warehouse = warehouse;
+        this.extList = extList;
+        this.partitionFields = partitionFields;
+    }
+
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> options = super.tableOptions();
+
+        options.put(HUDI_OPTION_HIVE_SYNC_ENABLED, "true");
+        options.put(HUDI_OPTION_HIVE_SYNC_MODE, 
HUDI_OPTION_HIVE_SYNC_MODE_HMS_VALUE);
+        options.put(HUDI_OPTION_HIVE_SYNC_DB, dbName);
+        options.put(HUDI_OPTION_HIVE_SYNC_TABLE, tableName);
+        options.put(HUDI_OPTION_HIVE_SYNC_METASTORE_URIS, uri);
+
+        // partition field
+        if (partitionFields != null && !partitionFields.isEmpty()) {
+            String partitionKey =
+                    partitionFields.stream()
+                            .map(FieldInfo::getName)
+                            .collect(Collectors.joining(","));
+            options.put(HUDI_OPTION_PARTITIONPATH_FIELD_NAME, partitionKey);
+        }
+
+        extList.forEach(ext -> {
+            String keyName = ext.get("keyName");
+            if (StringUtils.isNoneBlank(keyName) &&
+                    keyName.startsWith(DDL_ATTRIBUTE_HUDI)) {
+                options.put(keyName, ext.get("keyValue"));
+            }
+        });
+
+        String path = String.format("%s/%s.db/%s", warehouse, dbName, 
tableName);
+        options.put(HUDI_OPTION_DEFAULT_PATH, path);
+
+        options.put(HUDI_OPTION_DATABASE_NAME, dbName);
+        options.put(HUDI_OPTION_TABLE_NAME, tableName);
+        options.put(HUDI_OPTION_RECORDKEY_FIELD_NAME, primaryKey);
+        options.put("connector", "hudi-inlong");
+
+        return options;
+    }
+
+    @Override
+    public String genTableName() {
+        return tableName;
+    }
+
+    @Override
+    public String getPrimaryKey() {
+        return primaryKey;
+    }
+
+    @Override
+    public List<FieldInfo> getPartitionFields() {
+        return super.getPartitionFields();
+    }
+
+}
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
new file mode 100644
index 000000000..dabfd54ac
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
@@ -0,0 +1,57 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+/**
+ * Test for {@link HudiLoadNode}
+ */
+public class HudiLoadNodeTest extends SerializeBaseTest<HudiLoadNode> {
+
+    @Override
+    public HudiLoadNode getTestObject() {
+        Map<String, Object> properties = Maps.newHashMap();
+        return new HudiLoadNode("1", "test_hudi",
+                Collections.singletonList(new FieldInfo("id", new 
StringFormatInfo())),
+                Collections.singletonList(new FieldRelation(new 
FieldInfo("id", new StringFormatInfo()),
+                        new FieldInfo("id", new StringFormatInfo()))),
+                null,
+                null,
+                1,
+                null,
+                "test_db",
+                "test_table",
+                "id",
+                CatalogType.HIVE,
+                "thrift://localhost:9083",
+                "hdfs://localhost:9000/user/hudi/warehouse",
+                new ArrayList<>(),
+                new ArrayList<>());
+    }
+}
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
new file mode 100644
index 000000000..0cb9802d1
--- /dev/null
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
@@ -0,0 +1,186 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements. See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership. The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for Hudi SQL parser.
+ */
+public class HudiNodeSqlParserTest extends AbstractTestBase {
+
+    private MySqlExtractNode buildMySQLExtractNode(String id) {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()),
+                new FieldInfo("event_type", new StringFormatInfo()));
+        // if you hope hive load mode of append, please add this config
+        Map<String, String> map = new HashMap<>();
+        map.put("append-mode", "true");
+        return new MySqlExtractNode(id, "mysql_input", fields,
+                null, map, "id",
+                Collections.singletonList("work1"), "localhost", "root", 
"123456",
+                "inlong", null, null,
+                null, null);
+    }
+
+    private HudiLoadNode buildHudiLoadNodeWithHadoopCatalog() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("salary", new StringFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new 
LongFormatInfo()),
+                        new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelation(new FieldInfo("name", new 
StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelation(new FieldInfo("age", new 
IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo())),
+                        new FieldRelation(new FieldInfo("ts", new 
TimestampFormatInfo()),
+                                new FieldInfo("ts", new 
TimestampFormatInfo())));
+
+        List<HashMap<String, String>> extList = new ArrayList<>();
+        HashMap<String, String> map = new HashMap<>();
+        map.put("table.type", "MERGE_ON_READ");
+        extList.add(map);
+
+        return new HudiLoadNode(
+                "hudi",
+                "hudi_table_name",
+                fields,
+                relations,
+                null,
+                null,
+                null,
+                null,
+                "inlong",
+                "inlong_hudi",
+                null,
+                CatalogType.HADOOP,
+                null,
+                "hdfs://localhost:9000/hudi/warehouse",
+                extList,
+                Lists.newArrayList());
+    }
+
+    private HudiLoadNode buildHudiLoadNodeWithHiveCatalog() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("id", new 
LongFormatInfo()),
+                        new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelation(new FieldInfo("name", new 
StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelation(new FieldInfo("age", new 
IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo())),
+                        new FieldRelation(new FieldInfo("ts", new 
TimestampFormatInfo()),
+                                new FieldInfo("ts", new 
TimestampFormatInfo())));
+        List<HashMap<String, String>> extList = new ArrayList<>();
+        HashMap<String, String> map = new HashMap<>();
+        map.put("table.type", "MERGE_ON_READ");
+        extList.add(map);
+
+        // set HIVE_CONF_DIR,or set uri and warehouse
+        return new HudiLoadNode(
+                "hudi",
+                "hudi_table_name",
+                fields,
+                relations,
+                null,
+                null,
+                null,
+                null,
+                "inlong",
+                "inlong_hudi",
+                null,
+                CatalogType.HIVE,
+                "thrift://localhost:9083",
+                "/hive/warehouse",
+                extList,
+                Lists.newArrayList());
+    }
+
+    /**
+     * build node relation
+     *
+     * @param inputs  extract node
+     * @param outputs load node
+     * @return node relation
+     */
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> 
outputs) {
+        List<String> inputIds = 
inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = 
outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    @Test
+    public void testHudi() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildMySQLExtractNode("1");
+        Node outputNode = buildHudiLoadNodeWithHiveCatalog();
+        StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(inputNode, 
outputNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("group_id", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        FlinkSqlParseResult result = (FlinkSqlParseResult) parser.parse();
+        Assert.assertTrue(!result.getLoadSqls().isEmpty() && 
!result.getCreateTableSqls().isEmpty());
+    }
+}

Reply via email to