This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 713ad49f0 [INLONG-5701][Manager][Sort] Support raw format (#5702) 713ad49f0 is described below commit 713ad49f07f319f9123a551aa6b630f7a31efd51 Author: emhui <111486498+e-m...@users.noreply.github.com> AuthorDate: Fri Aug 26 16:43:09 2022 +0800 [INLONG-5701][Manager][Sort] Support raw format (#5702) --- .../apache/inlong/common/enums/DataTypeEnum.java | 3 +- .../manager/pojo/sort/util/ExtractNodeUtils.java | 4 ++ .../inlong/sort/protocol/node/format/Format.java | 3 +- .../sort/protocol/node/format/RawFormat.java | 82 ++++++++++++++++++++++ .../sort/protocol/node/format/RawFormatTest.java | 37 ++++------ 5 files changed, 104 insertions(+), 25 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java index 8eeb6436b..e7cbbe9f5 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java @@ -25,7 +25,8 @@ public enum DataTypeEnum { AVRO("avro"), JSON("json"), CANAL("canal"), - DEBEZIUM_JSON("debezium_json"); + DEBEZIUM_JSON("debezium_json"), + RAW("raw"); @Getter private final String name; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java index c94f84af4..d0e0fa4dc 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java @@ -56,6 +56,7 @@ import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat; import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat; import org.apache.inlong.sort.protocol.node.format.JsonFormat; +import org.apache.inlong.sort.protocol.node.format.RawFormat; import java.util.List; import java.util.Map; @@ -248,6 +249,9 @@ public class ExtractNodeUtils { case DEBEZIUM_JSON: format = new DebeziumJsonFormat(); break; + case RAW: + format = new RawFormat(); + break; default: throw new IllegalArgumentException( String.format("Unsupported dataType=%s for pulsar source", dataType)); diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java index 4d2b30c5f..e40e6ed79 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java @@ -38,7 +38,8 @@ import java.util.Map; @JsonSubTypes.Type(value = DebeziumJsonFormat.class, name = "debeziumJsonFormat"), @JsonSubTypes.Type(value = CanalJsonFormat.class, name = "canalJsonFormat"), @JsonSubTypes.Type(value = CsvFormat.class, name = "csvFormat"), - @JsonSubTypes.Type(value = InLongMsgFormat.class, name = "inLongMsgFormat") + @JsonSubTypes.Type(value = InLongMsgFormat.class, name = "inLongMsgFormat"), + @JsonSubTypes.Type(value = RawFormat.class, name = "rawFormat") }) public interface Format extends Serializable { diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/RawFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/RawFormat.java new file mode 100644 index 000000000..e2281d229 --- /dev/null +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/RawFormat.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.node.format; + +import java.util.HashMap; +import java.util.Map; +import lombok.Data; +import lombok.ToString; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; + +/** + * The Raw format + * + * @see <a herf="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/raw/"> + * Raw Format</a> + */ +@Data +@JsonTypeName("rawFormat") +@ToString +public class RawFormat implements Format { + + private static final long serialVersionUID = 1L; + + @JsonProperty(value = "rawCharset", defaultValue = "UTF-8") + private String rawCharset; + @JsonProperty(value = "rawEndianness", defaultValue = "big-endian") + private String rawEndianness; + + @JsonCreator + public RawFormat(@JsonProperty(value = "rawCharset", defaultValue = "UTF-8") String rawCharset, + @JsonProperty(value = "rawEndianness", defaultValue = "big-endian") String rawEndianness) { + this.rawCharset = rawCharset; + this.rawEndianness = rawEndianness; + } + + @JsonCreator + public RawFormat() { + this("UTF-8", "big-endian"); + } + + /** + * Return raw + * + * @return format + */ + @JsonIgnore + @Override + public String getFormat() { + return "raw"; + } + + /** + * Generate options for connector + * + * @return options + */ + public Map<String, String> generateOptions() { + Map<String, String> options = new HashMap<>(16); + options.put("format", getFormat()); + options.put("raw.charset", this.rawCharset); + options.put("raw.endianness", this.rawEndianness); + return options; + } +} diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/RawFormatTest.java similarity index 54% copy from inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java copy to inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/RawFormatTest.java index 8eeb6436b..6da8854c1 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/RawFormatTest.java @@ -15,31 +15,22 @@ * limitations under the License. */ -package org.apache.inlong.common.enums; +package org.apache.inlong.sort.protocol.node.format; -import java.util.Locale; -import lombok.Getter; +import org.apache.inlong.sort.SerializeBaseTest; -public enum DataTypeEnum { - CSV("csv"), - AVRO("avro"), - JSON("json"), - CANAL("canal"), - DEBEZIUM_JSON("debezium_json"); - - @Getter - private final String name; - - DataTypeEnum(String name) { - this.name = name; - } +/** + * Test for {@link RawFormat} + */ +public class RawFormatTest extends SerializeBaseTest<RawFormat> { - public static DataTypeEnum forName(String name) { - for (DataTypeEnum dataType : values()) { - if (dataType.getName().equals(name.toLowerCase(Locale.ROOT))) { - return dataType; - } - } - throw new IllegalArgumentException(String.format("Unsupport dataType for Inlong:%s", name)); + /** + * Get test object + * + * @return The test object + */ + @Override + public RawFormat getTestObject() { + return new RawFormat(); } }