This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 713ad49f0 [INLONG-5701][Manager][Sort] Support raw format (#5702)
713ad49f0 is described below

commit 713ad49f07f319f9123a551aa6b630f7a31efd51
Author: emhui <111486498+e-m...@users.noreply.github.com>
AuthorDate: Fri Aug 26 16:43:09 2022 +0800

    [INLONG-5701][Manager][Sort] Support raw format (#5702)
---
 .../apache/inlong/common/enums/DataTypeEnum.java   |  3 +-
 .../manager/pojo/sort/util/ExtractNodeUtils.java   |  4 ++
 .../inlong/sort/protocol/node/format/Format.java   |  3 +-
 .../sort/protocol/node/format/RawFormat.java       | 82 ++++++++++++++++++++++
 .../sort/protocol/node/format/RawFormatTest.java   | 37 ++++------
 5 files changed, 104 insertions(+), 25 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index 8eeb6436b..e7cbbe9f5 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -25,7 +25,8 @@ public enum DataTypeEnum {
     AVRO("avro"),
     JSON("json"),
     CANAL("canal"),
-    DEBEZIUM_JSON("debezium_json");
+    DEBEZIUM_JSON("debezium_json"),
+    RAW("raw");
 
     @Getter
     private final String name;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index c94f84af4..d0e0fa4dc 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -56,6 +56,7 @@ import 
org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
 import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.format.RawFormat;
 
 import java.util.List;
 import java.util.Map;
@@ -248,6 +249,9 @@ public class ExtractNodeUtils {
             case DEBEZIUM_JSON:
                 format = new DebeziumJsonFormat();
                 break;
+            case RAW:
+                format = new RawFormat();
+                break;
             default:
                 throw new IllegalArgumentException(
                         String.format("Unsupported dataType=%s for pulsar 
source", dataType));
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
index 4d2b30c5f..e40e6ed79 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
@@ -38,7 +38,8 @@ import java.util.Map;
         @JsonSubTypes.Type(value = DebeziumJsonFormat.class, name = 
"debeziumJsonFormat"),
         @JsonSubTypes.Type(value = CanalJsonFormat.class, name = 
"canalJsonFormat"),
         @JsonSubTypes.Type(value = CsvFormat.class, name = "csvFormat"),
-        @JsonSubTypes.Type(value = InLongMsgFormat.class, name = 
"inLongMsgFormat")
+        @JsonSubTypes.Type(value = InLongMsgFormat.class, name = 
"inLongMsgFormat"),
+        @JsonSubTypes.Type(value = RawFormat.class, name = "rawFormat")
 })
 public interface Format extends Serializable {
 
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/RawFormat.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/RawFormat.java
new file mode 100644
index 000000000..e2281d229
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/RawFormat.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Data;
+import lombok.ToString;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * The Raw format
+ *
+ * @see <a 
herf="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/raw/";>
+ *         Raw Format</a>
+ */
+@Data
+@JsonTypeName("rawFormat")
+@ToString
+public class RawFormat implements Format {
+
+    private static final long serialVersionUID = 1L;
+
+    @JsonProperty(value = "rawCharset", defaultValue = "UTF-8")
+    private String rawCharset;
+    @JsonProperty(value = "rawEndianness", defaultValue = "big-endian")
+    private String rawEndianness;
+
+    @JsonCreator
+    public RawFormat(@JsonProperty(value = "rawCharset", defaultValue = 
"UTF-8") String rawCharset,
+            @JsonProperty(value = "rawEndianness", defaultValue = 
"big-endian") String rawEndianness) {
+        this.rawCharset = rawCharset;
+        this.rawEndianness = rawEndianness;
+    }
+
+    @JsonCreator
+    public RawFormat() {
+        this("UTF-8", "big-endian");
+    }
+
+    /**
+     * Return raw
+     *
+     * @return format
+     */
+    @JsonIgnore
+    @Override
+    public String getFormat() {
+        return "raw";
+    }
+
+    /**
+     * Generate options for connector
+     *
+     * @return options
+     */
+    public Map<String, String> generateOptions() {
+        Map<String, String> options = new HashMap<>(16);
+        options.put("format", getFormat());
+        options.put("raw.charset", this.rawCharset);
+        options.put("raw.endianness", this.rawEndianness);
+        return options;
+    }
+}
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/RawFormatTest.java
similarity index 54%
copy from 
inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
copy to 
inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/RawFormatTest.java
index 8eeb6436b..6da8854c1 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/RawFormatTest.java
@@ -15,31 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.common.enums;
+package org.apache.inlong.sort.protocol.node.format;
 
-import java.util.Locale;
-import lombok.Getter;
+import org.apache.inlong.sort.SerializeBaseTest;
 
-public enum DataTypeEnum {
-    CSV("csv"),
-    AVRO("avro"),
-    JSON("json"),
-    CANAL("canal"),
-    DEBEZIUM_JSON("debezium_json");
-
-    @Getter
-    private final String name;
-
-    DataTypeEnum(String name) {
-        this.name = name;
-    }
+/**
+ * Test for {@link RawFormat}
+ */
+public class RawFormatTest extends SerializeBaseTest<RawFormat> {
 
-    public static DataTypeEnum forName(String name) {
-        for (DataTypeEnum dataType : values()) {
-            if (dataType.getName().equals(name.toLowerCase(Locale.ROOT))) {
-                return dataType;
-            }
-        }
-        throw new IllegalArgumentException(String.format("Unsupport dataType 
for Inlong:%s", name));
+    /**
+     * Get test object
+     *
+     * @return The test object
+     */
+    @Override
+    public RawFormat getTestObject() {
+        return new RawFormat();
     }
 }

Reply via email to