This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d62115dd6c [INLONG-9594][sort] Need to remove duplicate classes in the 
same directory under different sort format modules (#9595)
d62115dd6c is described below

commit d62115dd6cfb09110ca703dcd974fcbed3eeefc3
Author: baomingyu <baomingy...@163.com>
AuthorDate: Mon Jan 22 14:15:08 2024 +0800

    [INLONG-9594][sort] Need to remove duplicate classes in the same directory 
under different sort format modules (#9595)
---
 .../formats/inlongmsg/IgnoreFailureHandler.java    |   2 +-
 .../sort/formats/inlongmsg/InLongMsgOptions.java   |  27 +++++
 .../sort/formats/inlongmsg/FailureHandler.java     |  54 ----------
 .../formats/inlongmsg/IgnoreFailureHandler.java    |  52 ----------
 .../sort/formats/inlongmsg/InLongMsgBody.java      | 105 -------------------
 .../sort/formats/inlongmsg/InLongMsgHead.java      | 113 ---------------------
 .../formats/inlongmsg/InLongMsgMixedValidator.java |  40 --------
 .../sort/formats/inlongmsg/InLongMsgOptions.java   |  77 --------------
 .../sort/formats/inlongmsg/InLongMsgValidator.java |  50 ---------
 .../sort/formats/inlongmsg/NoOpFailureHandler.java |  55 ----------
 10 files changed, 28 insertions(+), 547 deletions(-)

diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
index 33f46b1c2b..bb57bddfba 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
@@ -34,7 +34,7 @@ public class IgnoreFailureHandler implements FailureHandler {
 
     @Override
     public void onParsingBodyFailure(byte[] body, Exception exception) {
-        LOG.warn("Cannot properly parse the body.", exception);
+        LOG.warn("Cannot properly parse the body: {}.", new String(body), 
exception);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
index e07b102222..4d6ccf860f 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
@@ -158,4 +158,31 @@ public class InLongMsgOptions {
                     .defaultValue(DEFAULT_INCLUDE_UPDATE_BEFORE)
                     .withDescription(
                             "True if the retain predefined field should be 
skip the predefined Field. (true by default).");
+
+    public static final ConfigOption<Boolean> CSV_IGNORE_PARSE_ERRORS =
+            ConfigOptions.key("csv.ignore-parse-errors")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Allows the case that real size exceeds 
the expected size.\n "
+                            + "The extra column will be skipped");
+
+    public static final ConfigOption<Boolean> CSV_IGNORE_TRAILING_UNMAPPABLE =
+            ConfigOptions.key("csv.ignore-trailing-unmappable")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Allows the case that real size exceeds 
the expected size.\n "
+                            + "The extra column will be skipped");
+
+    public static final ConfigOption<Boolean> 
CSV_INSERT_NULLS_FOR_MISSING_COLUMNS =
+            ConfigOptions.key("csv.insert-nulls-for-missing-columns")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("For missing columns, insert null.");
+
+    public static final ConfigOption<Boolean> CSV_EMPTY_STRING_AS_NULL =
+            ConfigOptions.key("csv.empty-string-as-null")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("if the string value is empty, make it as 
null");
+
 }
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
deleted file mode 100644
index 00b0249d51..0000000000
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.inlongmsg;
-
-import java.io.Serializable;
-
-/**
- * Interface to handle the failure on parsing InLongMsg data.
- */
-public interface FailureHandler extends Serializable {
-
-    /**
-     * This method is called when there is a failure occurred while parsing 
InLongMsg head.
-     *
-     * @param attribute the attribute which head is parsed from
-     * @param exception the thrown exception
-     * @throws Exception the exception
-     */
-    void onParsingHeadFailure(String attribute, Exception exception) throws 
Exception;
-
-    /**
-     * This method is called when there is a failure occurred while parsing 
InLongMsg body.
-     *
-     * @param body the body bytes which body is parsed from
-     * @param exception the thrown exception
-     * @throws Exception the exception
-     */
-    void onParsingBodyFailure(byte[] body, Exception exception) throws 
Exception;
-
-    /**
-     * This method is called when there is a failure occurred while converting 
head and body to row.
-     *
-     * @param head the head of row
-     * @param body the body of row
-     * @param exception the thrown exception
-     * @throws Exception the exception
-     */
-    void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body, 
Exception exception) throws Exception;
-}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
deleted file mode 100644
index bb57bddfba..0000000000
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.inlongmsg;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of {@link FailureHandler} that ignores the failure.
- */
-public class IgnoreFailureHandler implements FailureHandler {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(IgnoreFailureHandler.class);
-
-    @Override
-    public void onParsingHeadFailure(String attribute, Exception exception) {
-        LOG.warn("Cannot properly parse the head {}", attribute, exception);
-    }
-
-    @Override
-    public void onParsingBodyFailure(byte[] body, Exception exception) {
-        LOG.warn("Cannot properly parse the body: {}.", new String(body), 
exception);
-    }
-
-    @Override
-    public void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body, 
Exception exception) {
-        LOG.warn("Cannot properly convert the InLongMsg ({}, {})", head, body, 
exception);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        return o != null && getClass() == o.getClass();
-    }
-}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
deleted file mode 100644
index 0ec159f47e..0000000000
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.inlongmsg;
-
-import org.apache.inlong.common.msg.InLongMsg;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The body deserialized from {@link InLongMsg}.
- */
-public class InLongMsgBody implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * The body of the record.
-     */
-    private final byte[] data;
-
-    /**
-     * The interface of the record.
-     */
-    private final String tid;
-
-    /**
-     * The fields extracted from the body.
-     */
-    private final List<String> fields;
-
-    /**
-     * The entries extracted from the body.
-     */
-    private final Map<String, String> entries;
-
-    public InLongMsgBody(
-            byte[] data,
-            String tid,
-            List<String> fields,
-            Map<String, String> entries) {
-        this.data = data;
-        this.tid = tid;
-        this.fields = fields;
-        this.entries = entries;
-    }
-
-    public byte[] getData() {
-        return data;
-    }
-
-    public String getTid() {
-        return tid;
-    }
-
-    public List<String> getFields() {
-        return fields;
-    }
-
-    public Map<String, String> getEntries() {
-        return entries;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        InLongMsgBody inLongMsgBody = (InLongMsgBody) o;
-        return Arrays.equals(data, inLongMsgBody.data);
-    }
-
-    @Override
-    public int hashCode() {
-        return Arrays.hashCode(data);
-    }
-
-    @Override
-    public String toString() {
-        return "InLongMsgBody{" + "data=" + Arrays.toString(data) + ", tid='" 
+ tid + '\''
-                + ", fields=" + fields + ", entries=" + entries + '}';
-    }
-}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
deleted file mode 100644
index 61e6f6234f..0000000000
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.inlongmsg;
-
-import org.apache.inlong.common.msg.InLongMsg;
-
-import java.io.Serializable;
-import java.sql.Timestamp;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * The head deserialized from {@link InLongMsg}.
- */
-public class InLongMsgHead implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * The attributes in the head.
-     */
-    private final Map<String, String> attributes;
-
-    /**
-     * The interface of the record.
-     */
-    private final String tid;
-
-    /**
-     * The time of the record.
-     */
-    private final Timestamp time;
-
-    /**
-     * The predefined fields extracted from the head.
-     */
-    private final List<String> predefinedFields;
-
-    public InLongMsgHead(
-            Map<String, String> attributes,
-            String tid,
-            Timestamp time,
-            List<String> predefinedFields) {
-        this.attributes = attributes;
-        this.tid = tid;
-        this.time = time;
-        this.predefinedFields = predefinedFields;
-    }
-
-    public Map<String, String> getAttributes() {
-        return attributes;
-    }
-
-    public String getTid() {
-        return tid;
-    }
-
-    public Timestamp getTime() {
-        return time;
-    }
-
-    public List<String> getPredefinedFields() {
-        return predefinedFields;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        InLongMsgHead that = (InLongMsgHead) o;
-        return Objects.equals(attributes, that.attributes)
-                && Objects.equals(tid, that.tid)
-                && Objects.equals(time, that.time)
-                && Objects.equals(predefinedFields, that.predefinedFields);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(attributes, tid, time, predefinedFields);
-    }
-
-    @Override
-    public String toString() {
-        return "InLongMsgHead{"
-                + "attributes=" + attributes
-                + ", tid='" + tid + '\''
-                + ", time=" + time
-                + ", predefinedFields=" + predefinedFields
-                + '}';
-    }
-}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedValidator.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedValidator.java
deleted file mode 100644
index bf14669bd1..0000000000
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedValidator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.inlongmsg;
-
-import org.apache.inlong.sort.formats.base.TableFormatConstants;
-import org.apache.inlong.sort.formats.base.TableFormatForRowUtils;
-
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.DescriptorValidator;
-
-/**
- * Validator for mixed inlongmsg formats.
- */
-public class InLongMsgMixedValidator implements DescriptorValidator {
-
-    @Override
-    public void validate(DescriptorProperties properties) {
-        TableFormatForRowUtils.getValidateProperties(properties);
-        properties.validateString(TableFormatConstants.FORMAT_DELIMITER, true, 
1, 1);
-        properties.validateString(TableFormatConstants.FORMAT_ENTRY_DELIMITER, 
true, 1, 1);
-        properties.validateString(TableFormatConstants.FORMAT_KV_DELIMITER, 
true, 1, 1);
-        properties.validateString(TableFormatConstants.FORMAT_QUOTE_CHARACTER, 
true, 1, 1);
-        properties.validateBoolean(TableFormatConstants.FORMAT_IGNORE_ERRORS, 
true);
-    }
-}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
deleted file mode 100644
index be0a9354fd..0000000000
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.inlongmsg;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.ValidationException;
-
-public class InLongMsgOptions {
-
-    private InLongMsgOptions() {
-    }
-
-    public static final ConfigOption<String> INNER_FORMAT =
-            ConfigOptions.key("inner.format")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Defines the format identifier for 
encoding attr data. \n"
-                            + "The identifier is used to discover a suitable 
format factory.");
-
-    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
-            ConfigOptions.key("ignore-parse-errors")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("Optional flag to skip fields and rows 
with parse errors instead of failing;\n"
-                            + "fields are set to null in case of errors");
-
-    public static final ConfigOption<Boolean> CSV_IGNORE_PARSE_ERRORS =
-            ConfigOptions.key("csv.ignore-parse-errors")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("Allows the case that real size exceeds 
the expected size.\n "
-                            + "The extra column will be skipped");
-
-    public static final ConfigOption<Boolean> CSV_IGNORE_TRAILING_UNMAPPABLE =
-            ConfigOptions.key("csv.ignore-trailing-unmappable")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("Allows the case that real size exceeds 
the expected size.\n "
-                            + "The extra column will be skipped");
-
-    public static final ConfigOption<Boolean> 
CSV_INSERT_NULLS_FOR_MISSING_COLUMNS =
-            ConfigOptions.key("csv.insert-nulls-for-missing-columns")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("For missing columns, insert null.");
-
-    public static final ConfigOption<Boolean> CSV_EMPTY_STRING_AS_NULL =
-            ConfigOptions.key("csv.empty-string-as-null")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("if the string value is empty, make it as 
null");
-
-    public static void validateDecodingFormatOptions(ReadableConfig config) {
-        String innerFormat = config.get(INNER_FORMAT);
-        if (innerFormat == null) {
-            throw new ValidationException(
-                    INNER_FORMAT.key() + " shouldn't be null.");
-        }
-    }
-}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgValidator.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgValidator.java
deleted file mode 100644
index e33071d171..0000000000
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgValidator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.inlongmsg;
-
-import org.apache.inlong.sort.formats.base.FormatDescriptorValidator;
-
-import org.apache.flink.table.descriptors.DescriptorProperties;
-
-import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
-import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
-import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ESCAPE_CHARACTER;
-import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
-import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
-import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
-import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.validateSchema;
-
-/**
- * Validator for mixed inlongmsg formats.
- */
-public class InLongMsgValidator extends FormatDescriptorValidator {
-
-    @Override
-    public void validate(DescriptorProperties properties) {
-        super.validate(properties);
-
-        properties.validateString(FORMAT_DELIMITER, true, 1, 1);
-        properties.validateString(FORMAT_ENTRY_DELIMITER, true, 1, 1);
-        properties.validateString(FORMAT_KV_DELIMITER, true, 1, 1);
-        properties.validateString(FORMAT_ESCAPE_CHARACTER, true, 1, 1);
-        properties.validateString(FORMAT_QUOTE_CHARACTER, true, 1, 1);
-        properties.validateBoolean(FORMAT_IGNORE_ERRORS, true);
-
-        validateSchema(properties);
-    }
-}
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
deleted file mode 100644
index 3f4eb090fa..0000000000
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.formats.inlongmsg;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of {@link FailureHandler} that just throws the exception 
out.
- */
-public class NoOpFailureHandler implements FailureHandler {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(NoOpFailureHandler.class);
-
-    @Override
-    public void onParsingHeadFailure(String attribute, Exception exception) 
throws Exception {
-        LOG.error("Cannot properly parse the head {}", attribute, exception);
-        throw exception;
-    }
-
-    @Override
-    public void onParsingBodyFailure(byte[] body, Exception exception) throws 
Exception {
-        LOG.error("Cannot properly parse the body: {}.", new String(body), 
exception);
-        throw exception;
-    }
-
-    @Override
-    public void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body, 
Exception exception) throws Exception {
-        LOG.error("Cannot properly convert the InLongMsg ({}, {})", head, 
body, exception);
-        throw exception;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        return o != null && getClass() == o.getClass();
-    }
-}

Reply via email to