This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7d2523918 [INLONG-4170][Sort] Add FileSystem load node (#4173)
7d2523918 is described below
commit 7d2523918cc34e437ce72a7e2fef1b212c334aa9
Author: ganfengtan <[email protected]>
AuthorDate: Sun May 15 10:57:51 2022 +0800
[INLONG-4170][Sort] Add FileSystem load node (#4173)
---
.../apache/inlong/sort/protocol/node/LoadNode.java | 6 +-
.../org/apache/inlong/sort/protocol/node/Node.java | 5 +-
.../protocol/node/load/FileSystemLoadNode.java | 123 +++++++++++++++++++++
inlong-sort/sort-single-tenant/pom.xml | 11 +-
.../org.apache.flink.core.fs.FileSystemFactory | 16 ---
.../flink/parser/FlinkSqlParserTest.java | 49 ++++++++
licenses/inlong-sort/LICENSE | 1 +
licenses/inlong-sort/NOTICE | 22 ++++
pom.xml | 5 +
9 files changed, 216 insertions(+), 22 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 59e6189d6..73a431e02 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -27,8 +27,9 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
@@ -48,7 +49,8 @@ import java.util.Map;
@JsonSubTypes({
@JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
@JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad"),
- @JsonSubTypes.Type(value = HbaseLoadNode.class, name = "hbaseLoad")
+ @JsonSubTypes.Type(value = HbaseLoadNode.class, name = "hbaseLoad"),
+ @JsonSubTypes.Type(value = FileSystemLoadNode.class, name =
"fileSystemLoad")
})
@NoArgsConstructor
@Data
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 56a2cb62b..7c2afbe43 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -25,6 +25,8 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
+import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
@@ -49,7 +51,8 @@ import java.util.TreeMap;
@JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad"),
@JsonSubTypes.Type(value = DistinctNode.class, name = "distinct"),
@JsonSubTypes.Type(value = HiveLoadNode.class, name = "hiveLoad"),
- @JsonSubTypes.Type(value = HiveLoadNode.class, name = "hbaseLoad")
+ @JsonSubTypes.Type(value = FileSystemLoadNode.class, name =
"fileSystemLoad"),
+ @JsonSubTypes.Type(value = HbaseLoadNode.class, name = "hbaseLoad")
})
public interface Node {
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
new file mode 100644
index 000000000..da8c41b93
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/FileSystemLoadNode.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.WatermarkField;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("fileSystemLoad")
+@Data
+@NoArgsConstructor
+public class FileSystemLoadNode extends LoadNode implements Serializable {
+
+ private static final long serialVersionUID = -4836034838166667371L;
+
+ private static final String trigger = "sink.partition-commit.trigger";
+ private static final String delay = "sink.partition-commit.delay";
+ private static final String policyKind =
"sink.partition-commit.policy.kind";
+ private static final String waterMarkZone =
"sink.partition-commit.watermark-time-zone";
+ private static final String rollingRolloverInterval =
"sink.rolling-policy.rollover-interval";
+ private static final String rollingPolicyFileSize =
"sink.rolling-policy.file-size";
+
+ @JsonProperty("format")
+ @Nonnull
+ private String format;
+
+ @JsonProperty("path")
+ @Nonnull
+ private String path;
+
+ @JsonProperty("partitionFields")
+ private List<FieldInfo> partitionFields;
+
+ @JsonProperty("watermarkField")
+ private WatermarkField watermarkField;
+
+ @JsonProperty("name")
+ private String name;
+
+ @JsonCreator
+ public FileSystemLoadNode(@JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @JsonProperty("fieldRelationShips") List<FieldRelationShip>
fieldRelationShips,
+ @JsonProperty("filters") List<FilterFunction> filters,
+ @Nonnull @JsonProperty("path") String path,
+ @Nonnull @JsonProperty("format") String format,
+ @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+ @JsonProperty("properties") Map<String, String> properties,
+ @JsonProperty("parFields") List<FieldInfo> partitionFields,
+ @JsonProperty("watermarkField") WatermarkField watermarkField) {
+ super(id, name, fields, fieldRelationShips, filters, null,
sinkParallelism, properties);
+ this.format = Preconditions.checkNotNull(format, "format type is
null");
+ this.path = Preconditions.checkNotNull(path, "path is null");
+ this.partitionFields = partitionFields;
+ this.watermarkField = watermarkField;
+ this.name = name;
+ }
+
+ @Override
+ public Map<String, String> tableOptions() {
+ Map<String, String> map = super.tableOptions();
+ map.put("connector", "filesystem");
+ map.put("path", path);
+ map.put("format", format);
+ if (null != partitionFields) {
+ Map<String, String> properties = super.getProperties();
+ if (null == properties || !properties.containsKey(trigger)) {
+ map.put(trigger, "process-time");
+ }
+ if (null == properties || !properties.containsKey(delay)) {
+ map.put(delay, "10s");
+ }
+ if (null == properties || !properties.containsKey(policyKind)) {
+ map.put(policyKind, "metastore,success-file");
+ }
+ }
+ if (!map.containsKey(rollingRolloverInterval)) {
+ map.put(rollingRolloverInterval, "1min");
+ }
+ if (!map.containsKey(rollingPolicyFileSize)) {
+ map.put(rollingPolicyFileSize, "128MB");
+ }
+ return map;
+ }
+
+ @Override
+ public String genTableName() {
+ return "node_" + super.getId() + "_" + name;
+ }
+
+}
diff --git a/inlong-sort/sort-single-tenant/pom.xml
b/inlong-sort/sort-single-tenant/pom.xml
index d87564b4e..613f34ce7 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -18,9 +18,9 @@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-sort</artifactId>
@@ -129,6 +129,11 @@
<artifactId>flink-avro</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-orc_${scala.binary.version}</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-format-base</artifactId>
diff --git
a/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
b/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
deleted file mode 100644
index 9b55f0991..000000000
---
a/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.inlong.sort.singletenant.flink.hive.filesystems.CHDFSFsFactory
diff --git
a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
index 3942e5fae..5a92c3a44 100644
---
a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
+++
b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
@@ -34,6 +34,7 @@ import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
@@ -140,6 +141,27 @@ public class FlinkSqlParserTest extends AbstractTestBase {
null, null);
}
+ private FileSystemLoadNode buildFileSystemNode(String id) {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()));
+ List<FieldRelationShip> relations = Arrays
+ .asList(new FieldRelationShip(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelationShip(new FieldInfo("name", new
StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelationShip(new FieldInfo("age", new
IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())),
+ new FieldRelationShip(new FieldInfo("ts", new
TimestampFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()))
+ );
+ return new FileSystemLoadNode(id, "hdfs_output", fields, relations,
+ null, "hdfs://localhost:9000/file", "json",
+ 1, null, null, null);
+ }
+
/**
* Test flink sql mysql cdc to hive
*
@@ -178,6 +200,33 @@ public class FlinkSqlParserTest extends AbstractTestBase {
parser.parse();
}
+ /**
+ * Test flink sql mysql cdc to file system
+ *
+ * @throws Exception The exception may throws when execute the case
+ */
+ @Test
+ public void testToFileSystem() {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node mysqlExtractNode = buildMySQLExtractNode("1");
+ Node fileSystemNode = buildFileSystemNode("2");
+ //mysql-->HDFS OR LOCAL FILE
+ StreamInfo streamInfoToHDFS = new StreamInfo("1L",
Arrays.asList(mysqlExtractNode, fileSystemNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(mysqlExtractNode),
+ Collections.singletonList(fileSystemNode))));
+ GroupInfo groupInfoToHDFS = new GroupInfo("1",
Collections.singletonList(streamInfoToHDFS));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfoToHDFS);
+ parser.parse();
+ }
+
/**
* Test flink sql parse
*
diff --git a/licenses/inlong-sort/LICENSE b/licenses/inlong-sort/LICENSE
index 5b6fc1d30..cc98721a2 100644
--- a/licenses/inlong-sort/LICENSE
+++ b/licenses/inlong-sort/LICENSE
@@ -536,6 +536,7 @@ The text of each license is the standard Apache 2.0 license.
org.apache.flink:flink-json:1.13.5 - Flink : Formats : Json
(https://github.com/apache/flink/tree/release-1.13.5/flink-formats/flink-json),
(The Apache Software License, Version 2.0)
org.apache.flink:flink-csv:1.13.5 - Flink : Formats : Csv
(https://github.com/apache/flink/tree/release-1.13.5/flink-formats/flink-csv),
(The Apache Software License, Version 2.0)
org.apache.flink:flink-parquet_2.11:1.13.5 - Flink : Formats : Parquet
(https://github.com/apache/flink/tree/release-1.13.5/flink-formats/flink-parquet),
(The Apache Software License, Version 2.0)
+ org.apache.flink:flink-orc_2.11:1.13.5 - Flink : Formats : Orc
(https://github.com/apache/flink/tree/release-1.13.5/flink-formats/flink-orc),
(The Apache Software License, Version 2.0)
org.apache.flink:flink-scala_2.11:1.13.5 - Flink : Scala
(https://github.com/apache/flink/tree/release-1.13.5/flink-scala), (The Apache
Software License, Version 2.0)
org.apache.flink:flink-shaded-asm-7:7.1-13.0 - flink-shaded-asm-7
(https://github.com/apache/flink-shaded/tree/release-7.0/flink-shaded-asm-6),
(The Apache Software License, Version 2.0)
org.apache.flink:flink-shaded-jackson:2.12.1-13.0 - flink-shaded-jackson-2
(http://flink.apache.org/flink-shaded-jackson-parent/flink-shaded-jackson),
(The Apache Software License, Version 2.0)
diff --git a/licenses/inlong-sort/NOTICE b/licenses/inlong-sort/NOTICE
index 8006a20e2..78caeeaa6 100644
--- a/licenses/inlong-sort/NOTICE
+++ b/licenses/inlong-sort/NOTICE
@@ -711,6 +711,28 @@ Copyright 2014-2021 The Apache Software Foundation
+========================================================================
+
+Flink : Formats : Orc NOTICE
+========================================================================
+
+// ------------------------------------------------------------------
+// NOTICE file corresponding to the section 4d of The Apache License,
+// Version 2.0, in this case for Apache Flink
+// ------------------------------------------------------------------
+
+Apache Flink
+Copyright 2006-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+
+Flink : Formats : Orc
+Copyright 2014-2021 The Apache Software Foundation
+
+
+
========================================================================
Flink : Scala NOTICE
diff --git a/pom.xml b/pom.xml
index bd2fe4c31..58692b65f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -908,6 +908,11 @@
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-orc_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>