This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d2523918 [INLONG-4170][Sort] Add FileSystem load node (#4173)
7d2523918 is described below

commit 7d2523918cc34e437ce72a7e2fef1b212c334aa9
Author: ganfengtan <[email protected]>
AuthorDate: Sun May 15 10:57:51 2022 +0800

    [INLONG-4170][Sort] Add FileSystem load node (#4173)
---
 .../apache/inlong/sort/protocol/node/LoadNode.java |   6 +-
 .../org/apache/inlong/sort/protocol/node/Node.java |   5 +-
 .../protocol/node/load/FileSystemLoadNode.java     | 123 +++++++++++++++++++++
 inlong-sort/sort-single-tenant/pom.xml             |  11 +-
 .../org.apache.flink.core.fs.FileSystemFactory     |  16 ---
 .../flink/parser/FlinkSqlParserTest.java           |  49 ++++++++
 licenses/inlong-sort/LICENSE                       |   1 +
 licenses/inlong-sort/NOTICE                        |  22 ++++
 pom.xml                                            |   5 +
 9 files changed, 216 insertions(+), 22 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 59e6189d6..73a431e02 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -27,8 +27,9 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
@@ -48,7 +49,8 @@ import java.util.Map;
 @JsonSubTypes({
         @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
         @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad"),
-        @JsonSubTypes.Type(value = HbaseLoadNode.class, name = "hbaseLoad")
+        @JsonSubTypes.Type(value = HbaseLoadNode.class, name = "hbaseLoad"),
+        @JsonSubTypes.Type(value = FileSystemLoadNode.class, name = 
"fileSystemLoad")
 })
 @NoArgsConstructor
 @Data
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 56a2cb62b..7c2afbe43 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -25,6 +25,8 @@ import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
+import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
@@ -49,7 +51,8 @@ import java.util.TreeMap;
         @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
         @JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"),
         @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad"),
-        @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hbaseLoad")
+        @JsonSubTypes.Type(value = FileSystemLoadNode.class, name = 
"fileSystemLoad"),
+        @JsonSubTypes.Type(value = HbaseLoadNode.class, name = "hbaseLoad")
 })
 public interface Node {
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
new file mode 100644
index 000000000..da8c41b93
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("fileSystemLoad")
+@Data
+@NoArgsConstructor
+public class FileSystemLoadNode extends LoadNode implements Serializable {
+
+    private static final long serialVersionUID = -4836034838166667371L;
+
+    private static final String trigger = "sink.partition-commit.trigger";
+    private static final String delay = "sink.partition-commit.delay";
+    private static final String policyKind = 
"sink.partition-commit.policy.kind";
+    private static final String waterMarkZone = 
"sink.partition-commit.watermark-time-zone";
+    private static final String rollingRolloverInterval = 
"sink.rolling-policy.rollover-interval";
+    private static final String rollingPolicyFileSize = 
"sink.rolling-policy.file-size";
+
+    @JsonProperty("format")
+    @Nonnull
+    private String format;
+
+    @JsonProperty("path")
+    @Nonnull
+    private String path;
+
+    @JsonProperty("partitionFields")
+    private List<FieldInfo> partitionFields;
+
+    @JsonProperty("watermarkField")
+    private WatermarkField watermarkField;
+
+    @JsonProperty("name")
+    private String name;
+
+    @JsonCreator
+    public FileSystemLoadNode(@JsonProperty("id") String id,
+            @JsonProperty("name") String name,
+            @JsonProperty("fields") List<FieldInfo> fields,
+            @JsonProperty("fieldRelationShips") List<FieldRelationShip> 
fieldRelationShips,
+            @JsonProperty("filters") List<FilterFunction> filters,
+            @Nonnull @JsonProperty("path") String path,
+            @Nonnull @JsonProperty("format") String format,
+            @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+            @JsonProperty("properties") Map<String, String> properties,
+            @JsonProperty("parFields") List<FieldInfo> partitionFields,
+            @JsonProperty("watermarkField") WatermarkField watermarkField) {
+        super(id, name, fields, fieldRelationShips, filters, null, 
sinkParallelism, properties);
+        this.format = Preconditions.checkNotNull(format, "format type is 
null");
+        this.path = Preconditions.checkNotNull(path, "path is null");
+        this.partitionFields = partitionFields;
+        this.watermarkField = watermarkField;
+        this.name = name;
+    }
+
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> map = super.tableOptions();
+        map.put("connector", "filesystem");
+        map.put("path", path);
+        map.put("format", format);
+        if (null != partitionFields) {
+            Map<String, String> properties = super.getProperties();
+            if (null == properties || !properties.containsKey(trigger)) {
+                map.put(trigger, "process-time");
+            }
+            if (null == properties || !properties.containsKey(delay)) {
+                map.put(delay, "10s");
+            }
+            if (null == properties || !properties.containsKey(policyKind)) {
+                map.put(policyKind, "metastore,success-file");
+            }
+        }
+        if (!map.containsKey(rollingRolloverInterval)) {
+            map.put(rollingRolloverInterval, "1min");
+        }
+        if (!map.containsKey(rollingPolicyFileSize)) {
+            map.put(rollingPolicyFileSize, "128MB");
+        }
+        return map;
+    }
+
+    @Override
+    public String genTableName() {
+        return "node_" + super.getId() + "_" + name;
+    }
+
+}
diff --git a/inlong-sort/sort-single-tenant/pom.xml 
b/inlong-sort/sort-single-tenant/pom.xml
index d87564b4e..613f34ce7 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -18,9 +18,9 @@
     under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+        xmlns="http://maven.apache.org/POM/4.0.0";
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
         <groupId>org.apache.inlong</groupId>
         <artifactId>inlong-sort</artifactId>
@@ -129,6 +129,11 @@
             <artifactId>flink-avro</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-orc_${scala.binary.version}</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>sort-format-base</artifactId>
diff --git 
a/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
 
b/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
deleted file mode 100644
index 9b55f0991..000000000
--- 
a/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.inlong.sort.singletenant.flink.hive.filesystems.CHDFSFsFactory
diff --git 
a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
 
b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
index 3942e5fae..5a92c3a44 100644
--- 
a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
+++ 
b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
@@ -34,6 +34,7 @@ import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
@@ -140,6 +141,27 @@ public class FlinkSqlParserTest extends AbstractTestBase {
                 null, null);
     }
 
+    private FileSystemLoadNode buildFileSystemNode(String id) {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new 
LongFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("salary", new FloatFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        List<FieldRelationShip> relations = Arrays
+                .asList(new FieldRelationShip(new FieldInfo("id", new 
LongFormatInfo()),
+                                new FieldInfo("id", new LongFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("name", new 
StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("age", new 
IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo())),
+                        new FieldRelationShip(new FieldInfo("ts", new 
TimestampFormatInfo()),
+                                new FieldInfo("ts", new TimestampFormatInfo()))
+                );
+        return new FileSystemLoadNode(id, "hdfs_output", fields, relations,
+                null, "hdfs://localhost:9000/file", "json",
+                1, null, null, null);
+    }
+
     /**
      * Test flink sql mysql cdc to hive
      *
@@ -178,6 +200,33 @@ public class FlinkSqlParserTest extends AbstractTestBase {
         parser.parse();
     }
 
+    /**
+     * Test flink sql mysql cdc to file system
+     *
+     * @throws Exception The exception may throws when execute the case
+     */
+    @Test
+    public void testToFileSystem() {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node mysqlExtractNode = buildMySQLExtractNode("1");
+        Node fileSystemNode = buildFileSystemNode("2");
+        //mysql-->HDFS OR LOCAL FILE
+        StreamInfo streamInfoToHDFS = new StreamInfo("1L", 
Arrays.asList(mysqlExtractNode, fileSystemNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(mysqlExtractNode),
+                        Collections.singletonList(fileSystemNode))));
+        GroupInfo groupInfoToHDFS = new GroupInfo("1", 
Collections.singletonList(streamInfoToHDFS));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfoToHDFS);
+        parser.parse();
+    }
+
     /**
      * Test flink sql parse
      *
diff --git a/licenses/inlong-sort/LICENSE b/licenses/inlong-sort/LICENSE
index 5b6fc1d30..cc98721a2 100644
--- a/licenses/inlong-sort/LICENSE
+++ b/licenses/inlong-sort/LICENSE
@@ -536,6 +536,7 @@ The text of each license is the standard Apache 2.0 license.
   org.apache.flink:flink-json:1.13.5 - Flink : Formats : Json 
(https://github.com/apache/flink/tree/release-1.13.5/flink-formats/flink-json), 
(The Apache Software License, Version 2.0)
   org.apache.flink:flink-csv:1.13.5 - Flink : Formats : Csv 
(https://github.com/apache/flink/tree/release-1.13.5/flink-formats/flink-csv), 
(The Apache Software License, Version 2.0)
   org.apache.flink:flink-parquet_2.11:1.13.5 - Flink : Formats : Parquet 
(https://github.com/apache/flink/tree/release-1.13.5/flink-formats/flink-parquet),
 (The Apache Software License, Version 2.0)
+  org.apache.flink:flink-orc_2.11:1.13.5 - Flink : Formats : Orc 
(https://github.com/apache/flink/tree/release-1.13.5/flink-formats/flink-orc), 
(The Apache Software License, Version 2.0)
   org.apache.flink:flink-scala_2.11:1.13.5 - Flink : Scala 
(https://github.com/apache/flink/tree/release-1.13.5/flink-scala), (The Apache 
Software License, Version 2.0)
   org.apache.flink:flink-shaded-asm-7:7.1-13.0 - flink-shaded-asm-7 
(https://github.com/apache/flink-shaded/tree/release-7.0/flink-shaded-asm-6), 
(The Apache Software License, Version 2.0)
   org.apache.flink:flink-shaded-jackson:2.12.1-13.0 - flink-shaded-jackson-2 
(http://flink.apache.org/flink-shaded-jackson-parent/flink-shaded-jackson), 
(The Apache Software License, Version 2.0)
diff --git a/licenses/inlong-sort/NOTICE b/licenses/inlong-sort/NOTICE
index 8006a20e2..78caeeaa6 100644
--- a/licenses/inlong-sort/NOTICE
+++ b/licenses/inlong-sort/NOTICE
@@ -711,6 +711,28 @@ Copyright 2014-2021 The Apache Software Foundation
 
 
 
+========================================================================
+
+Flink : Formats : Orc NOTICE
+========================================================================
+
+// ------------------------------------------------------------------
+// NOTICE file corresponding to the section 4d of The Apache License,
+// Version 2.0, in this case for Apache Flink
+// ------------------------------------------------------------------
+
+Apache Flink
+Copyright 2006-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+Flink : Formats : Orc
+Copyright 2014-2021 The Apache Software Foundation
+
+
+
 ========================================================================
 
 Flink : Scala NOTICE
diff --git a/pom.xml b/pom.xml
index bd2fe4c31..58692b65f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -908,6 +908,11 @@
                 <artifactId>flink-csv</artifactId>
                 <version>${flink.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-orc_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-shaded-jackson</artifactId>

Reply via email to