This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6b650bef0 [Feature][Connector-V2] Support user-defined schema for
source connectors (#2392)
6b650bef0 is described below
commit 6b650bef07a152644a94ed063fa62310d8b7054a
Author: TyrantLucifer <[email protected]>
AuthorDate: Thu Aug 11 11:18:25 2022 +0800
[Feature][Connector-V2] Support user-defined schema for source connectors
(#2392)
* [Feature][Connector-V2] Support user-defined schema for source connectors
---
.../seatunnel/common/schema/SeatunnelSchema.java | 217 +++++++++++++++++++++
.../connector/common/schema/SchemaParseTest.java | 83 ++++++++
.../src/test/resources/complex.schema.conf | 38 ++++
.../src/test/resources/simple.schema.conf | 37 ++++
4 files changed, 375 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
new file mode 100644
index 000000000..8acad190c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.schema;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.utils.JsonUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+
+import java.util.Map;
+
+public class SeatunnelSchema {
+ private static final String FIELD_KEY = "fields";
+ private final SeaTunnelRowType seaTunnelRowType;
+
+ private SeatunnelSchema(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ private static String[] parseMapGeneric(String type) {
+ int start = type.indexOf("<");
+ int end = type.lastIndexOf(">");
+ String genericType = type
+ // get the content between '<' and '>'
+ .substring(start + 1, end)
+ // replace the space between key and value
+ .replace(" ", "");
+ int index = genericType.indexOf(",");
+ String keyGenericType = genericType.substring(0, index);
+ String valueGenericType = genericType.substring(index + 1);
+ return new String[]{keyGenericType, valueGenericType};
+ }
+
+ private static String parseArrayGeneric(String type) {
+ int start = type.indexOf("<");
+ int end = type.lastIndexOf(">");
+ return type
+ // get the content between '<' and '>'
+ .substring(start + 1, end)
+ // replace the space between key and value
+ .replace(" ", "");
+ }
+
+ private static int[] parseDecimalPS(String type) {
+ int start = type.indexOf("(");
+ int end = type.lastIndexOf(")");
+ String decimalInfo = type
+ // get the content between '(' and ')'
+ .substring(start + 1, end)
+ // replace the space between precision and scale
+ .replace(" ", "");
+ String[] split = decimalInfo.split(",");
+ if (split.length < 2) {
+ throw new RuntimeException("Decimal type should assign precision
and scale information");
+ }
+ int precision = Integer.parseInt(split[0]);
+ int scale = Integer.parseInt(split[1]);
+ return new int[]{precision, scale};
+ }
+
+ private static SeaTunnelDataType<?> parseTypeByString(String type) {
+ // init precision (used by decimal type)
+ int precision = 0;
+ // init scale (used by decimal type)
+ int scale = 0;
+ // init generic type (used by array type)
+ String genericType = "";
+ // init key generic type (used by map type)
+ String keyGenericType = "";
+ // init value generic type (used by map type)
+ String valueGenericType = "";
+ // convert type to uppercase
+ type = type.toUpperCase();
+ if (type.contains("<") || type.contains(">")) {
+ // Map type or Array type
+ if (type.contains(SqlType.MAP.name())) {
+ String[] genericTypes = parseMapGeneric(type);
+ keyGenericType = genericTypes[0];
+ valueGenericType = genericTypes[1];
+ type = SqlType.MAP.name();
+ } else {
+ genericType = parseArrayGeneric(type);
+ type = SqlType.ARRAY.name();
+ }
+ }
+ if (type.contains("(")) {
+ // Decimal type
+ int[] results = parseDecimalPS(type);
+ precision = results[0];
+ scale = results[1];
+ type = SqlType.DECIMAL.name();
+ }
+ SqlType sqlType;
+ try {
+ sqlType = SqlType.valueOf(type);
+ } catch (IllegalArgumentException e) {
+ String errorMsg = String.format("Field type not support [%s],
currently only support [array, map, string, boolean, tinyint, smallint, int,
bigint, float, double, decimal, null, bytes, date, time, timestamp]",
type.toUpperCase());
+ throw new RuntimeException(errorMsg);
+ }
+ switch (sqlType) {
+ case ARRAY:
+ SeaTunnelDataType<?> dataType = parseTypeByString(genericType);
+ if (BasicType.STRING_TYPE.equals(dataType)) {
+ return ArrayType.STRING_ARRAY_TYPE;
+ } else if (BasicType.BOOLEAN_TYPE.equals(dataType)) {
+ return ArrayType.BOOLEAN_ARRAY_TYPE;
+ } else if (BasicType.BYTE_TYPE.equals(dataType)) {
+ return ArrayType.BYTE_ARRAY_TYPE;
+ } else if (BasicType.SHORT_TYPE.equals(dataType)) {
+ return ArrayType.SHORT_ARRAY_TYPE;
+ } else if (BasicType.INT_TYPE.equals(dataType)) {
+ return ArrayType.INT_ARRAY_TYPE;
+ } else if (BasicType.LONG_TYPE.equals(dataType)) {
+ return ArrayType.LONG_ARRAY_TYPE;
+ } else if (BasicType.FLOAT_TYPE.equals(dataType)) {
+ return ArrayType.FLOAT_ARRAY_TYPE;
+ } else if (BasicType.DOUBLE_TYPE.equals(dataType)) {
+ return ArrayType.DOUBLE_ARRAY_TYPE;
+ } else {
+ String errorMsg = String.format("Array type not support
this genericType [%s]", genericType);
+ throw new RuntimeException(errorMsg);
+ }
+ case MAP:
+ return new MapType<>(parseTypeByString(keyGenericType),
parseTypeByString(valueGenericType));
+ case STRING:
+ return BasicType.STRING_TYPE;
+ case BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case TINYINT:
+ case BYTES:
+ return BasicType.BYTE_TYPE;
+ case SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case INT:
+ return BasicType.INT_TYPE;
+ case BIGINT:
+ return BasicType.LONG_TYPE;
+ case FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case DECIMAL:
+ return new DecimalType(precision, scale);
+ case NULL:
+ return BasicType.VOID_TYPE;
+ case DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case TIMESTAMP:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ default:
+ throw new RuntimeException("Not support [row] type now");
+ }
+ }
+
+ private static Map<String, String> convertConfig2Map(Config config) {
+ // Because the entrySet in typesafe config couldn't keep key-value
order
+ // So use jackson parsing schema information into a map to keep
key-value order
+ ConfigRenderOptions options = ConfigRenderOptions.concise();
+ String schema = config.root().render(options);
+ return JsonUtils.toMap(schema);
+ }
+
+ public static SeatunnelSchema buildWithConfig(Config schemaConfig) {
+ CheckResult checkResult = CheckConfigUtil.checkAllExists(schemaConfig,
FIELD_KEY);
+ if (!checkResult.isSuccess()) {
+ String errorMsg = String.format("Schema config need option [%s],
please correct your config first", FIELD_KEY);
+ throw new RuntimeException(errorMsg);
+ }
+ Config fields = schemaConfig.getConfig(FIELD_KEY);
+ Map<String, String> fieldsMap = convertConfig2Map(fields);
+ int fieldsNum = fieldsMap.size();
+ int i = 0;
+ String[] fieldsName = new String[fieldsNum];
+ SeaTunnelDataType<?>[] seaTunnelDataTypes = new
SeaTunnelDataType<?>[fieldsNum];
+ for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ SeaTunnelDataType<?> dataType = parseTypeByString(value);
+ fieldsName[i] = key;
+ seaTunnelDataTypes[i] = dataType;
+ i++;
+ }
+ SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldsName,
seaTunnelDataTypes);
+ return new SeatunnelSchema(seaTunnelRowType);
+ }
+
+ public SeaTunnelRowType getSeaTunnelRowType() {
+ return seaTunnelRowType;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connector/common/schema/SchemaParseTest.java
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connector/common/schema/SchemaParseTest.java
new file mode 100644
index 000000000..e05c69f73
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connector/common/schema/SchemaParseTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connector.common.schema;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+
+public class SchemaParseTest {
+
+ @Test
+ public void testSimpleSchemaParse() throws FileNotFoundException,
URISyntaxException {
+ String path = getTestConfigFile("/simple.schema.conf");
+ Config config = ConfigFactory
+ .parseFile(new File(path))
+
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ config = config.getConfig("schema");
+ SeatunnelSchema seatunnelSchema =
SeatunnelSchema.buildWithConfig(config);
+ SeaTunnelRowType seaTunnelRowType =
seatunnelSchema.getSeaTunnelRowType();
+ Assertions.assertNotNull(seatunnelSchema);
+ Assertions.assertEquals(seaTunnelRowType.getFieldType(1),
ArrayType.BYTE_ARRAY_TYPE);
+ Assertions.assertEquals(seaTunnelRowType.getFieldType(2),
BasicType.STRING_TYPE);
+ Assertions.assertEquals(seaTunnelRowType.getFieldType(10), new
DecimalType(30, 8));
+ }
+
+ @Test
+ public void testComplexSchemaParse() throws FileNotFoundException,
URISyntaxException {
+ String path = getTestConfigFile("/complex.schema.conf");
+ Config config = ConfigFactory
+ .parseFile(new File(path))
+
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
+ config = config.getConfig("schema");
+ SeatunnelSchema seatunnelSchema =
SeatunnelSchema.buildWithConfig(config);
+ SeaTunnelRowType seaTunnelRowType =
seatunnelSchema.getSeaTunnelRowType();
+ Assertions.assertNotNull(seatunnelSchema);
+ Assertions.assertEquals(seaTunnelRowType.getFieldType(0),
+ new MapType<>(BasicType.STRING_TYPE, new
MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE)));
+ Assertions.assertEquals(seaTunnelRowType.getFieldType(1),
+ new MapType<>(BasicType.STRING_TYPE, new
MapType<>(BasicType.STRING_TYPE, ArrayType.INT_ARRAY_TYPE)));
+ }
+
+ public static String getTestConfigFile(String configFile) throws
FileNotFoundException, URISyntaxException {
+ URL resource = SchemaParseTest.class.getResource(configFile);
+ if (resource == null) {
+ throw new FileNotFoundException("Can't find config file: " +
configFile);
+ }
+ return Paths.get(resource.toURI()).toString();
+ }
+
+}
diff --git
a/seatunnel-connectors-v2/connector-common/src/test/resources/complex.schema.conf
b/seatunnel-connectors-v2/connector-common/src/test/resources/complex.schema.conf
new file mode 100644
index 000000000..6a06dbf06
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/test/resources/complex.schema.conf
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+schema {
+ fields {
+ map = "map<string, map<string, string>>"
+ map_array = "map<string, map<string, array<int>>>"
+ array = "array<tinyint>"
+ string = string
+ boolean = boolean
+ tinyint = tinyint
+ smallint = smallint
+ int = int
+ bigint = bigint
+ float = float
+ double = double
+ decimal = "decimal(30, 8)"
+ null = "null"
+ bytes = bytes
+ date = date
+ time = time
+ timestamp = timestamp
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-common/src/test/resources/simple.schema.conf
b/seatunnel-connectors-v2/connector-common/src/test/resources/simple.schema.conf
new file mode 100644
index 000000000..6716f00cd
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-common/src/test/resources/simple.schema.conf
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+schema {
+ fields {
+ map = "map<string, string>"
+ array = "array<tinyint>"
+ string = string
+ boolean = boolean
+ tinyint = tinyint
+ smallint = smallint
+ int = int
+ bigint = bigint
+ float = float
+ double = double
+ decimal = "decimal(30, 8)"
+ null = "null"
+ bytes = bytes
+ date = date
+ time = time
+ timestamp = timestamp
+ }
+}
\ No newline at end of file