This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7148c1cfaa [INLONG-10784][Sort] Refactor the structure of 
inlongmsg-rowdata (#10786)
7148c1cfaa is described below

commit 7148c1cfaa6208116a776694564bd0506825eeb0
Author: XiaoYou201 <xiaoyou...@foxmail.com>
AuthorDate: Wed Aug 14 12:11:56 2024 +0800

    [INLONG-10784][Sort] Refactor the structure of inlongmsg-rowdata (#10786)
    
    * [INLONG-10784][Sort] Refactor the structure of inlongmsg-rowdata
    
    * [INLONG-10784][Sort] fix format
    
    * [INLONG-10784][Sort] fix format
    
    * [INLONG-10784][Sort] fix format
    
    * [INLONG-10784][Sort] Rename row module structure
    
    * Revert "[INLONG-10784][Sort] Rename row module structure"
    
    This reverts commit 9ee654784ca6e213b642a584e7e5d78572925366.
    
    * Revert "[INLONG-10784][Sort] fix format"
    
    This reverts commit 32dcc771c1c2a53f14602537e38f7922ca620d8c.
    
    * Revert "[INLONG-10784][Sort] fix format"
    
    This reverts commit dd45c0fc26d6f7073d6d214ef36dc0d95ac22903.
    
    * Revert "[INLONG-10784][Sort] fix format"
    
    This reverts commit 4d1e2b0e8b74effa096a7612d2aa954882a519a2.
    
    * Revert "[INLONG-10784][Sort] Refactor the structure of inlongmsg-rowdata"
    
    This reverts commit a69075dfbb77e28fb2e3b6b7b6e41cbf89eb06de.
    
    * [INLONG-10784][Sort] Refactor the Row module structure
---
 .../AbstractInLongMsgFormatDeserializer.java         |  5 ++++-
 .../AbstractInLongMsgMixedFormatConverter.java       |  2 +-
 .../AbstractInLongMsgMixedFormatDeserializer.java    |  4 +++-
 .../inlongmsg/{ => row}/InLongMsgDecodingFormat.java |  5 +++--
 .../{ => row}/InLongMsgDeserializationSchema.java    |  5 +++--
 .../inlongmsg/{ => row}/InLongMsgFormatFactory.java  |  2 +-
 .../{ => row}/InLongMsgMixedFormatConverter.java     |  2 +-
 .../InLongMsgMixedFormatConverterBuilder.java        | 10 +++++-----
 .../InLongMsgMixedFormatConverterValidator.java      |  4 ++--
 .../InLongMsgMixedFormatDeserializerValidator.java   |  2 +-
 .../{ => row}/InLongMsgMixedFormatFactory.java       |  2 +-
 .../InLongMsgTextMixedFormatDeserializerBuilder.java |  2 +-
 .../formats/inlongmsg/{ => row}/InLongMsgUtils.java  |  7 ++++++-
 .../org.apache.flink.table.factories.Factory         |  2 +-
 .../inlongmsg/InLongMsgFormatFactoryTest.java        |  3 ++-
 .../formats/inlongmsg/InLongMsgRowDataSerDeTest.java |  1 +
 .../formats/inlongmsgbinlog/InLongMsgBinlog.java     |  4 ++--
 .../InLongMsgBinlogFormatBuilder.java                |  8 ++++----
 .../InLongMsgBinlogFormatDeserializer.java           |  4 ++--
 .../InLongMsgBinlogFormatFactory.java                | 14 +++++++-------
 .../InLongMsgBinlogMixedFormatConverter.java         |  2 +-
 .../InLongMsgBinlogMixedFormatDeserializer.java      |  4 ++--
 .../inlongmsgbinlog/InLongMsgBinlogUtils.java        | 20 ++++++++++----------
 .../inlongmsgbinlog/InLongMsgBinlogValidator.java    |  4 ++--
 .../InLongMsgBinlogFormatDeserializerTest.java       |  4 ++--
 .../sort/formats/inlongmsgcsv/InLongMsgCsv.java      |  6 +++---
 .../inlongmsgcsv/InLongMsgCsvFormatDeserializer.java | 16 ++++++++--------
 .../inlongmsgcsv/InLongMsgCsvFormatFactory.java      | 18 +++++++++---------
 .../InLongMsgCsvMixedFormatConverter.java            |  6 +++---
 .../InLongMsgCsvMixedFormatDeserializer.java         |  6 +++---
 .../sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java | 18 +++++++++---------
 .../formats/inlongmsgcsv/InLongMsgCsvValidator.java  |  2 +-
 .../InLongMsgCsvFormatDeserializerTest.java          |  4 ++--
 .../inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java  |  2 +-
 .../inlong/sort/formats/inlongmsgkv/InLongMsgKv.java |  6 +++---
 .../inlongmsgkv/InLongMsgKvFormatDeserializer.java   | 16 ++++++++--------
 .../inlongmsgkv/InLongMsgKvFormatFactory.java        | 18 +++++++++---------
 .../inlongmsgkv/InLongMsgKvMixedFormatConverter.java |  6 +++---
 .../InLongMsgKvMixedFormatDeserializer.java          |  6 +++---
 .../sort/formats/inlongmsgkv/InLongMsgKvUtils.java   | 18 +++++++++---------
 .../formats/inlongmsgkv/InLongMsgKvValidator.java    |  2 +-
 .../inlongmsgkv/InLongMsgKvFormatFactoryTest.java    |  4 ++--
 .../formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java   |  2 +-
 .../InLongMsgTlogCsvFormatDeserializer.java          | 12 ++++++------
 .../InLongMsgTlogCsvFormatFactory.java               | 16 ++++++++--------
 .../InLongMsgTlogCsvMixedFormatConverter.java        |  6 +++---
 .../InLongMsgTlogCsvMixedFormatDeserializer.java     |  6 +++---
 .../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java      | 12 ++++++------
 .../inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java  |  2 +-
 .../InLongMsgTlogCsvFormatDeserializerTest.java      |  4 ++--
 .../InLongMsgTlogCsvFormatFactoryTest.java           |  2 +-
 .../formats/inlongmsgtlogkv/InLongMsgTlogKv.java     |  2 +-
 .../InLongMsgTlogKvFormatDeserializer.java           | 12 ++++++------
 .../InLongMsgTlogKvFormatFactory.java                | 16 ++++++++--------
 .../InLongMsgTlogKvMixedFormatConverter.java         |  6 +++---
 .../InLongMsgTlogKvMixedFormatDeserializer.java      |  6 +++---
 .../inlongmsgtlogkv/InLongMsgTlogKvUtils.java        | 12 ++++++------
 .../inlongmsgtlogkv/InLongMsgTlogKvValidator.java    |  2 +-
 .../InLongMsgTlogKvFormatDeserializerTest.java       |  4 ++--
 .../InLongMsgTlogKvFormatFactoryTest.java            |  2 +-
 60 files changed, 207 insertions(+), 193 deletions(-)

diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgFormatDeserializer.java
similarity index 94%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgFormatDeserializer.java
index 79378468a5..4c264ffd4b 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgFormatDeserializer.java
@@ -15,10 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
 
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
 
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgMixedFormatConverter.java
similarity index 98%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgMixedFormatConverter.java
index 165f6532fb..96c91776ff 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgMixedFormatConverter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
 
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.types.Row;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgMixedFormatDeserializer.java
similarity index 91%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgMixedFormatDeserializer.java
index d2d0b7ca3f..00ff3ecf3b 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/AbstractInLongMsgMixedFormatDeserializer.java
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
+
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 
 import javax.annotation.Nonnull;
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgDecodingFormat.java
similarity index 98%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgDecodingFormat.java
index 779ff6b8e8..f3471596c7 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgDecodingFormat.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
 
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgDeserializationSchema.MetadataConverter;
 
 import com.google.common.annotations.VisibleForTesting;
 import lombok.extern.slf4j.Slf4j;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgDeserializationSchema.java
similarity index 97%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgDeserializationSchema.java
index d3a27c62be..fd5d01f609 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgDeserializationSchema.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
 
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.sort.formats.base.collectors.TimestampedCollector;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
 
 import com.google.common.base.Objects;
 import org.apache.flink.api.common.functions.util.ListCollector;
@@ -152,7 +153,7 @@ public class InLongMsgDeserializationSchema implements 
DeserializationSchema<Row
         return Objects.hashCode(deserializationSchema, metadataConverters, 
producedTypeInfo, ignoreErrors);
     }
 
-    interface MetadataConverter extends Serializable {
+    public interface MetadataConverter extends Serializable {
 
         Object read(InLongMsgHead head);
     }
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgFormatFactory.java
similarity index 98%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgFormatFactory.java
index c7caa43290..7b68dbe7ec 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgFormatFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverter.java
similarity index 95%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverter.java
index b5819503e0..bbade1c3b1 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterBuilder.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverterBuilder.java
similarity index 87%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterBuilder.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverterBuilder.java
index 2e289d4faa..5719eea0c1 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterBuilder.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverterBuilder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 
@@ -25,10 +25,10 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_I
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
 
 /**
  * The builder for {@link AbstractInLongMsgMixedFormatConverter}s.
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterValidator.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverterValidator.java
similarity index 91%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterValidator.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverterValidator.java
index d793dff7fc..d06345e23a 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatConverterValidator.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatConverterValidator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
 
 import org.apache.inlong.sort.formats.base.FormatDescriptorValidator;
 
@@ -23,7 +23,7 @@ import 
org.apache.flink.table.descriptors.DescriptorProperties;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.validateInLongMsgSchema;
 
 /**
  * The validator for the properties of {@link 
AbstractInLongMsgMixedFormatConverter}s.
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatDeserializerValidator.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatDeserializerValidator.java
similarity index 97%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatDeserializerValidator.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatDeserializerValidator.java
index d04a7e32b5..5360813bff 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatDeserializerValidator.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatDeserializerValidator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
 
 import org.apache.inlong.sort.formats.base.FormatDescriptorValidator;
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatFactory.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatFactory.java
similarity index 98%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatFactory.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatFactory.java
index 13bd4004ef..a181f8d813 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMixedFormatFactory.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgMixedFormatFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
 
 import java.util.Map;
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgTextMixedFormatDeserializerBuilder.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgTextMixedFormatDeserializerBuilder.java
similarity index 98%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgTextMixedFormatDeserializerBuilder.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgTextMixedFormatDeserializerBuilder.java
index 10f74065e0..7d0773d9d4 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgTextMixedFormatDeserializerBuilder.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgTextMixedFormatDeserializerBuilder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
 
 import org.apache.flink.table.descriptors.DescriptorProperties;
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
similarity index 98%
rename from 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
rename to 
inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
index 1a85eddd1e..00391b2911 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
@@ -15,13 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.formats.inlongmsg;
+package org.apache.inlong.sort.formats.inlongmsg.row;
 
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.TableFormatConstants;
 import org.apache.inlong.sort.formats.base.TableFormatForRowUtils;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.IgnoreFailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.NoOpFailureHandler;
 import org.apache.inlong.sort.formats.util.StringUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 0aa99381d8..a0bdce2d50 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.inlong.sort.formats.inlongmsg.InLongMsgFormatFactory
+org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgFormatFactory
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java
index 6232a13ebc..1f88226a98 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactoryTest.java
@@ -17,7 +17,8 @@
 
 package org.apache.inlong.sort.formats.inlongmsg;
 
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgDeserializationSchema;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgDeserializationSchema.MetadataConverter;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
index 64985985ab..0429e6f4aa 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.formats.inlongmsg;
 
 import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgFormatFactory;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.flink.api.common.functions.util.ListCollector;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
index cea8c011d0..7d9beb1402 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
@@ -31,8 +31,8 @@ import java.util.Map;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.FORMAT_METADATA_FIELD_NAME;
 
 /**
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatBuilder.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatBuilder.java
index 57c795548a..b13cdd822c 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatBuilder.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatBuilder.java
@@ -23,10 +23,10 @@ import 
org.apache.flink.table.descriptors.DescriptorProperties;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_IGNORE_ERRORS;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.DEFAULT_INCLUDE_UPDATE_BEFORE;
 import static 
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.DEFAULT_METADATA_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.FORMAT_INCLUDE_UPDATE_BEFORE;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
index 866a6be6ce..5128d51be5 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
@@ -18,11 +18,11 @@
 package org.apache.inlong.sort.formats.inlongmsgbinlog;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
index 45b9992837..893a70f460 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
@@ -20,11 +20,11 @@ package org.apache.inlong.sort.formats.inlongmsgbinlog;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgFormatDeserializer;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatDeserializerValidator;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatFactory;
 
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.TableFormatFactoryBase;
@@ -37,8 +37,8 @@ import java.util.Map;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_IGNORE_ERRORS;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.FORMAT_INCLUDE_UPDATE_BEFORE;
 import static 
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.FORMAT_METADATA_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.getDataRowFormatInfo;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
index 9bd4cf3c61..9f65d086d2 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.formats.inlongmsgbinlog;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
index 59d4959991..76ba972640 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
@@ -17,11 +17,11 @@
 
 package org.apache.inlong.sort.formats.inlongmsgbinlog;
 
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 40f398d6c5..59dbd84667 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -42,16 +42,16 @@ import java.util.Set;
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeRowFormatInfo;
 import static org.apache.inlong.sort.formats.base.TableFormatUtils.getType;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TID;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TID;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getPredefinedFields;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseAttr;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseEpochTime;
 
 public class InLongMsgBinlogUtils {
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogValidator.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogValidator.java
index d8957ee142..90c1424bd5 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogValidator.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogValidator.java
@@ -23,8 +23,8 @@ import 
org.apache.flink.table.descriptors.DescriptorProperties;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.FORMAT_INCLUDE_UPDATE_BEFORE;
 import static 
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.FORMAT_METADATA_FIELD_NAME;
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializerTest.java
index 8fc0950709..6035073272 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializerTest.java
@@ -27,8 +27,8 @@ import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.types.Row;
 import org.junit.Test;
 
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtils.DEFAULT_METADATA_FIELD_NAME;
 import static org.junit.Assert.assertEquals;
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
index 96e807a327..61b368eec0 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
@@ -22,9 +22,9 @@ import 
org.apache.inlong.sort.formats.base.TextFormatDescriptor;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER;
 
 /**
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index e43266e358..ab6a748056 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -19,11 +19,11 @@ package org.apache.inlong.sort.formats.inlongmsgcsv;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.TextFormatBuilder;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -40,12 +40,12 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_D
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_PREDEFINED_FIELD;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_PREDEFINED_FIELD;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER;
 import static 
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER;
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
index 8e21aa1d51..f62a0e0bed 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
@@ -20,11 +20,11 @@ package org.apache.inlong.sort.formats.inlongmsgcsv;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterValidator;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterValidator;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatDeserializerValidator;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatFactory;
 
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.TableFormatFactoryBase;
@@ -42,10 +42,10 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LI
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataRowFormatInfo;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getDataRowFormatInfo;
 import static 
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsv.FORMAT_TYPE_VALUE;
 import static 
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER;
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
index ea9f691cc6..6b4400f548 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
@@ -18,9 +18,9 @@
 package org.apache.inlong.sort.formats.inlongmsgcsv;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
index 194ca2fa85..f78d094d77 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
@@ -17,12 +17,12 @@
 
 package org.apache.inlong.sort.formats.inlongmsgcsv;
 
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgTextMixedFormatDeserializerBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgTextMixedFormatDeserializerBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.descriptors.DescriptorProperties;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index caa82e8faf..c347f5861b 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -35,15 +35,15 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TID;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TID;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getPredefinedFields;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseAttr;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseDateTime;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseEpochTime;
 import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
 
 /**
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvValidator.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvValidator.java
index d478e4ac06..a08303f6c1 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvValidator.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvValidator.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.table.descriptors.DescriptorProperties;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.validateInLongMsgSchema;
 import static 
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.FORMAT_DELETE_HEAD_DELIMITER;
 
 /**
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
index a6aaa9c8d0..8508aaa90e 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -44,8 +44,8 @@ import java.util.Map;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER;
 import static org.junit.Assert.assertEquals;
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
index e0d576d604..6dd23a904a 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
@@ -25,7 +25,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
 import org.apache.inlong.sort.formats.base.TableFormatForRowUtils;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
index 1a1b93cc16..934ecc66a7 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
@@ -23,9 +23,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
 
 /**
  * Format descriptor for KVs.
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
index 24a8f09ec0..1fecb43d5f 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
@@ -19,11 +19,11 @@ package org.apache.inlong.sort.formats.inlongmsgkv;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.TextFormatBuilder;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -42,12 +42,12 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_L
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_PREDEFINED_FIELD;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_PREDEFINED_FIELD;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvUtils.DEFAULT_INLONGMSGKV_CHARSET;
 
 /**
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
index 405069112f..be440b9522 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java
@@ -21,11 +21,11 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
 import 
org.apache.inlong.sort.formats.base.TableFormatDeserializer.TableFormatContext;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterValidator;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterValidator;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatDeserializerValidator;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatFactory;
 
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.TableFormatFactoryBase;
@@ -44,10 +44,10 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LI
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataRowFormatInfo;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getDataRowFormatInfo;
 
 /**
  * Table format factory for providing configured instances of 
InLongMsgKv-to-row
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
index 6eef05212a..a70ff5a1a3 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
@@ -18,9 +18,9 @@
 package org.apache.inlong.sort.formats.inlongmsgkv;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
index cceefd517e..ba85accfc8 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
@@ -17,12 +17,12 @@
 
 package org.apache.inlong.sort.formats.inlongmsgkv;
 
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgTextMixedFormatDeserializerBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgTextMixedFormatDeserializerBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.descriptors.DescriptorProperties;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index 5d5d3b860a..faeb11fa18 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -32,15 +32,15 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TID;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TID;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getPredefinedFields;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseAttr;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseDateTime;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseEpochTime;
 import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
 
 /**
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java
index e290784c09..a88dfe4fca 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.table.descriptors.DescriptorProperties;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.validateInLongMsgSchema;
 
 /**
  * The validator for {@link InLongMsgKv}.
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
index 16a7adc8b0..9d8eb59735 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java
@@ -35,8 +35,8 @@ import org.junit.Test;
 import java.util.HashMap;
 import java.util.Map;
 
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvUtils.DEFAULT_INLONGMSGKV_CHARSET;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java
index 168ba7b5ec..ffd54bcdf3 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsv.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
 
 import org.apache.inlong.sort.formats.base.TextFormatDescriptor;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
index e569d0f43f..eddbbccbc1 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
@@ -19,11 +19,11 @@ package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.TextFormatBuilder;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -38,10 +38,10 @@ import java.util.Objects;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
 
 /**
  * The deserializer for the records in InLongMsgTlogCsv format.
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java
index 5f67880178..c7f8366a89 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactory.java
@@ -21,11 +21,11 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
 import 
org.apache.inlong.sort.formats.base.TableFormatDeserializer.TableFormatContext;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterValidator;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterValidator;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatDeserializerValidator;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatFactory;
 
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.TableFormatFactoryBase;
@@ -42,9 +42,9 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IG
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataRowFormatInfo;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getDataRowFormatInfo;
 
 /**
  * Table format factory for providing configured instances of 
InLongMsgTlogCsv-to-row
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
index 27a1157dcd..4f587fb10b 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
@@ -18,9 +18,9 @@
 package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
index 11f86d204e..1cfb39827e 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
@@ -17,12 +17,12 @@
 
 package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
 
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgTextMixedFormatDeserializerBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgTextMixedFormatDeserializerBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.descriptors.DescriptorProperties;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index 9e0e35952b..d9a699b174 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -35,12 +35,12 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getPredefinedFields;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseAttr;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseDateTime;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseEpochTime;
 import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
 
 /**
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java
index 1e9f297004..f3ed57267c 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvValidator.java
@@ -22,7 +22,7 @@ import 
org.apache.inlong.sort.formats.base.TextFormatDescriptorValidator;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.validateInLongMsgSchema;
 
 /**
  * The validator for {@link InLongMsgTlogCsv}.
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
index 19c18ec49c..7d66b9ba1f 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -41,8 +41,8 @@ import java.util.Map;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 import static org.junit.Assert.assertEquals;
 
 /**
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java
index 06cde665a4..9cf4d5b88b 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatFactoryTest.java
@@ -25,7 +25,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
 import org.apache.inlong.sort.formats.base.TableFormatForRowUtils;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
index eb7ec66d35..8c324dc1d6 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKv.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.formats.inlongmsgtlogkv;
 
 import org.apache.inlong.sort.formats.base.TextFormatDescriptor;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
index 9f3df07d5d..0e4f9f37e5 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
@@ -19,11 +19,11 @@ package org.apache.inlong.sort.formats.inlongmsgtlogkv;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.TextFormatBuilder;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -42,10 +42,10 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_K
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET;
 
 /**
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactory.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactory.java
index 611a3dc303..88fe090f28 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactory.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactory.java
@@ -21,11 +21,11 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
 import 
org.apache.inlong.sort.formats.base.TableFormatDeserializer.TableFormatContext;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterValidator;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterValidator;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatDeserializerValidator;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatFactory;
 
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.TableFormatFactoryBase;
@@ -44,9 +44,9 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataRowFormatInfo;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getDataRowFormatInfo;
 import static 
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKv.FORMAT_TYPE_VALUE;
 
 /**
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
index 417343351e..7099fd995e 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
@@ -18,9 +18,9 @@
 package org.apache.inlong.sort.formats.inlongmsgtlogkv;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatConverter;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgMixedFormatConverterBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.types.Row;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
index 84a6823e35..b756a02d99 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
@@ -17,12 +17,12 @@
 
 package org.apache.inlong.sort.formats.inlongmsgtlogkv;
 
-import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
-import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgTextMixedFormatDeserializerBuilder;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.AbstractInLongMsgMixedFormatDeserializer;
+import 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgTextMixedFormatDeserializerBuilder;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.descriptors.DescriptorProperties;
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index 298cc650e0..b2cecd8248 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -31,12 +31,12 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.getPredefinedFields;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseAttr;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseDateTime;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.parseEpochTime;
 import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
 import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
index 8b163a4bb2..43315097ef 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvValidator.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.table.descriptors.DescriptorProperties;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.validateInLongMsgSchema;
 
 /**
  * The validator for {@link InLongMsgTlogKv}.
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
index d361313916..598d7bd5dd 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
@@ -41,8 +41,8 @@ import java.util.Map;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
-import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static 
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET;
 import static org.junit.Assert.assertEquals;
 
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactoryTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactoryTest.java
index 5022c64b87..8922dc8a55 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactoryTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatFactoryTest.java
@@ -25,7 +25,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.StringFormatInfo;
 import org.apache.inlong.sort.formats.base.TableFormatDeserializer;
 import org.apache.inlong.sort.formats.base.TableFormatForRowUtils;
-import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+import org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableSchema;

Reply via email to