This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f0cff8a9cf [INLONG-9563][Sort] Support rowdata way of sort message kv 
format (#9630)
f0cff8a9cf is described below

commit f0cff8a9cfae433071a37d45f9b9d4feb1ff9e1e
Author: baomingyu <baomingy...@163.com>
AuthorDate: Sun Feb 4 21:04:26 2024 +0800

    [INLONG-9563][Sort] Support rowdata way of sort message kv format (#9630)
    
    Co-authored-by: Charles Zhang <dockerzh...@apache.org>
---
 .../sort/formats/csv/CsvFormatFactoryTest.java     |  10 +-
 .../format-rowdata/format-rowdata-kv/pom.xml       | 142 +++++++++
 .../apache/inlong/sort/formats/kv/KvCommons.java   |  67 +++++
 .../inlong/sort/formats/kv/KvFormatBuilder.java    |  66 +++++
 .../inlong/sort/formats/kv/KvFormatFactory.java    | 216 ++++++++++++++
 .../formats/kv/KvRowDataDeserializationSchema.java | 245 ++++++++++++++++
 .../formats/kv/KvRowDataSerializationSchema.java   | 237 +++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 +
 .../sort/formats/kv/KvFormatFactoryTest.java       | 150 ++++++++++
 .../kv/KvRowDataDeserializationSchemaTest.java     | 322 +++++++++++++++++++++
 .../kv/KvRowDataSerializationSchemaTest.java       | 290 +++++++++++++++++++
 .../apache/inlong/sort/formats/kv/KvUtilsTest.java | 275 ++++++++++++++++++
 .../src/test/resources/log4j-test.properties       |  27 ++
 inlong-sort/sort-formats/format-rowdata/pom.xml    |   2 +
 14 files changed, 2060 insertions(+), 5 deletions(-)

diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvFormatFactoryTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvFormatFactoryTest.java
index 666340fdc9..8ce9024c2a 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvFormatFactoryTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvFormatFactoryTest.java
@@ -115,11 +115,11 @@ public class CsvFormatFactoryTest extends TestLogger {
         options.put("buffer-size", "1000");
 
         options.put("format", CsvFormatFactory.IDENTIFIER);
-        options.put("InLong-CSV.row.format.info", 
FormatUtils.marshall(testFormatInfo));
-        options.put("InLong-CSV.format.field-delimiter", ";");
-        options.put("InLong-CSV.format.quote-character", "'");
-        options.put("InLong-CSV.format.escape-character", "\\");
-        options.put("InLong-CSV.format.null-literal", "n/a");
+        options.put("inlong-csv.row.format.info", 
FormatUtils.marshall(testFormatInfo));
+        options.put("inlong-csv.format.field-delimiter", ";");
+        options.put("inlong-csv.format.quote-character", "'");
+        options.put("inlong-csv.format.escape-character", "\\");
+        options.put("inlong-csv.format.null-literal", "n/a");
         return options;
     }
 
diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml
new file mode 100644
index 0000000000..3254fe23a7
--- /dev/null
+++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>format-rowdata</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-format-rowdata-kv</artifactId>
+    <packaging>jar</packaging>
+    <name>Apache InLong - Sort Format-RowData-KV</name>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir>
+    </properties>
+
+    <dependencies>
+
+        <!-- core dependencies -->
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-rowdata-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-jackson</artifactId>
+            <version>${flink.jackson.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <profiles>
+        <profile>
+            <id>japicmp-report</id>
+            <activation>
+                <property>
+                    <name>japicmp-report</name>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>com.github.siom79.japicmp</groupId>
+                        <artifactId>japicmp-maven-plugin</artifactId>
+                        <configuration>
+                            <parameter>
+                                
<breakBuildOnBinaryIncompatibleModifications>false</breakBuildOnBinaryIncompatibleModifications>
+                                
<breakBuildOnSourceIncompatibleModifications>false</breakBuildOnSourceIncompatibleModifications>
+                            </parameter>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+        <profile>
+            <id>japicmp-check</id>
+            <activation>
+                <property>
+                    <name>!japicmp-report</name>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>com.github.siom79.japicmp</groupId>
+                        <artifactId>japicmp-maven-plugin</artifactId>
+                        <configuration>
+                            <parameter>
+                                <excludes>
+                                    
<exclude>org.apache.inlong.sort.formats.kv.KvRowDataDeserializationSchema</exclude>
+                                    
<exclude>org.apache.inlong.sort.formats.kv.KvRowDataSerializationSchema</exclude>
+                                </excludes>
+                            </parameter>
+                        </configuration>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvCommons.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvCommons.java
new file mode 100644
index 0000000000..1ef60e5691
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvCommons.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.kv;
+
+import org.apache.inlong.sort.formats.base.TextFormatOptions;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+/**
+ * Commons values and methods for KV format.
+ */
+public class KvCommons {
+
+    static void validateFormatOptions(ReadableConfig tableOptions) {
+
+        validateCharacterVal(tableOptions, TextFormatOptions.FIELD_DELIMITER, 
true);
+        validateCharacterVal(tableOptions, TextFormatOptions.KV_DELIMITER, 
true);
+        validateCharacterVal(tableOptions, 
TextFormatOptions.KV_ENTRY_DELIMITER, true);
+        validateCharacterVal(tableOptions, TextFormatOptions.QUOTE_CHARACTER);
+        validateCharacterVal(tableOptions, TextFormatOptions.ESCAPE_CHARACTER);
+    }
+
+    private static void validateCharacterVal(
+            ReadableConfig tableOptions,
+            ConfigOption<String> option) {
+        validateCharacterVal(tableOptions, option, false);
+    }
+
+    private static void validateCharacterVal(
+            ReadableConfig tableOptions,
+            ConfigOption<String> option,
+            boolean unescape) {
+        if (!tableOptions.getOptional(option).isPresent()) {
+            return;
+        }
+
+        final String value = unescape
+                ? StringEscapeUtils.unescapeJava(tableOptions.get(option))
+                : tableOptions.get(option);
+
+        if (value.length() != 1) {
+            throw new ValidationException(
+                    String.format(
+                            "Option '%s.%s' must be a String with single 
character, but was: %s",
+                            KvFormatFactory.IDENTIFIER, option.key(), 
tableOptions.get(option)));
+        }
+    }
+
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatBuilder.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatBuilder.java
new file mode 100644
index 0000000000..7f42e43b6a
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.kv;
+
+import org.apache.inlong.sort.formats.base.TextFormatBuilder;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_ENTRY_DELIMITER;
+
+/**
+ * The builder for the serializers and deserializers for kv formats.
+ */
+public abstract class KvFormatBuilder<T extends KvFormatBuilder> extends 
TextFormatBuilder<T> {
+
+    protected char entryDelimiter = DEFAULT_ENTRY_DELIMITER;
+    protected char kvDelimiter = DEFAULT_KV_DELIMITER;
+
+    public KvFormatBuilder(RowFormatInfo rowFormatInfo) {
+        super(rowFormatInfo);
+    }
+
+    @SuppressWarnings("unchecked")
+    public T setEntryDelimiter(char entryDelimiter) {
+        this.entryDelimiter = entryDelimiter;
+        return (T) this;
+    }
+
+    @SuppressWarnings("unchecked")
+    public T setKvDelimiter(char kvDelimiter) {
+        this.kvDelimiter = kvDelimiter;
+        return (T) this;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public T configure(DescriptorProperties descriptorProperties) {
+        super.configure(descriptorProperties);
+
+        descriptorProperties.getOptionalCharacter(FORMAT_KV_ENTRY_DELIMITER)
+                .ifPresent(this::setEntryDelimiter);
+        descriptorProperties.getOptionalCharacter(FORMAT_KV_DELIMITER)
+                .ifPresent(this::setKvDelimiter);
+
+        return (T) this;
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java
new file mode 100644
index 0000000000..dff4adff04
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvFormatFactory.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.kv;
+
+import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
+import org.apache.inlong.sort.formats.base.TableFormatOptions;
+import org.apache.inlong.sort.formats.base.TextFormatOptions;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatOptions.IGNORE_ERRORS;
+import static 
org.apache.inlong.sort.formats.base.TableFormatOptions.ROW_FORMAT_INFO;
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeRowFormatInfo;
+import static 
org.apache.inlong.sort.formats.base.TextFormatOptions.ESCAPE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.base.TextFormatOptions.NULL_LITERAL;
+import static 
org.apache.inlong.sort.formats.base.TextFormatOptions.QUOTE_CHARACTER;
+
+/**
+ * Table format factory for providing configured instances of KV-to-RowData 
serializer and
+ * deserializer.
+ */
+public class KvFormatFactory
+        implements
+            SerializationFormatFactory,
+            DeserializationFormatFactory {
+
+    public static final String IDENTIFIER = "inlong-kv";
+    public static final String KV_PREFIX = IDENTIFIER + ".";
+
+    @Override
+    public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
+            DynamicTableFactory.Context context,
+            ReadableConfig formatOptions) {
+
+        FactoryUtil.validateFactoryOptions(this, formatOptions);
+        KvCommons.validateFormatOptions(formatOptions);
+        return new EncodingFormat<SerializationSchema<RowData>>() {
+
+            @Override
+            public SerializationSchema<RowData> createRuntimeEncoder(
+                    DynamicTableSink.Context context,
+                    DataType dataType) {
+
+                final RowFormatInfo projectedRowFormatInfo = 
TableFormatForRowDataUtils.projectRowFormatInfo(
+                        
deserializeRowFormatInfo(formatOptions.get(ROW_FORMAT_INFO)),
+                        dataType);
+                KvRowDataSerializationSchema.Builder schemaBuilder =
+                        new 
KvRowDataSerializationSchema.Builder(projectedRowFormatInfo);
+                configureSerializationSchema(formatOptions, schemaBuilder);
+                return schemaBuilder.build();
+            }
+
+            @Override
+            public ChangelogMode getChangelogMode() {
+                return ChangelogMode.insertOnly();
+            }
+        };
+    }
+
+    @Override
+    public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
+            DynamicTableFactory.Context context,
+            ReadableConfig formatOptions) {
+        FactoryUtil.validateFactoryOptions(this, formatOptions);
+        KvCommons.validateFormatOptions(formatOptions);
+
+        return new DecodingFormat<DeserializationSchema<RowData>>() {
+
+            @Override
+            public DeserializationSchema<RowData> createRuntimeDecoder(
+                    DynamicTableSource.Context context,
+                    DataType dataType) {
+
+                KvRowDataDeserializationSchema.Builder schemaBuilder =
+                        new KvRowDataDeserializationSchema.Builder(
+                                
deserializeRowFormatInfo(formatOptions.get(ROW_FORMAT_INFO)),
+                                context.createTypeInformation(dataType));
+                configureDeserializationSchema(formatOptions, schemaBuilder);
+                return schemaBuilder.build();
+            }
+
+            @Override
+            public ChangelogMode getChangelogMode() {
+                return ChangelogMode.insertOnly();
+            }
+        };
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(ROW_FORMAT_INFO);
+        options.add(TextFormatOptions.KV_ENTRY_DELIMITER);
+        options.add(TextFormatOptions.KV_DELIMITER);
+        options.add(TextFormatOptions.CHARSET);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(TextFormatOptions.ESCAPE_CHARACTER);
+        options.add(TextFormatOptions.QUOTE_CHARACTER);
+        options.add(TextFormatOptions.NULL_LITERAL);
+        options.add(TableFormatOptions.IGNORE_ERRORS);
+        return options;
+    }
+
+    // ------------------------------------------------------------------------
+    // Utilities
+    // ------------------------------------------------------------------------
+
+    private static void configureSerializationSchema(
+            ReadableConfig formatOptions,
+            KvRowDataSerializationSchema.Builder schemaBuilder) {
+
+        formatOptions.getOptional(TextFormatOptions.CHARSET)
+                .ifPresent(schemaBuilder::setCharset);
+
+        formatOptions.getOptional(TextFormatOptions.KV_ENTRY_DELIMITER)
+                .map(delimiter -> 
StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+                .ifPresent(schemaBuilder::setEntryDelimiter);
+
+        formatOptions.getOptional(TextFormatOptions.KV_DELIMITER)
+                .map(delimiter -> 
StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+                .ifPresent(schemaBuilder::setKvDelimiter);
+
+        formatOptions
+                .getOptional(ESCAPE_CHARACTER)
+                .map(escape -> escape.charAt(0))
+                .ifPresent(schemaBuilder::setEscapeCharacter);
+
+        formatOptions
+                .getOptional(QUOTE_CHARACTER)
+                .map(quote -> quote.charAt(0))
+                .ifPresent(schemaBuilder::setQuoteCharacter);
+
+        formatOptions.getOptional(NULL_LITERAL)
+                .ifPresent(schemaBuilder::setNullLiteral);
+
+        formatOptions.getOptional(IGNORE_ERRORS)
+                .ifPresent(schemaBuilder::setIgnoreErrors);
+
+    }
+
+    private static void configureDeserializationSchema(
+            ReadableConfig formatOptions,
+            KvRowDataDeserializationSchema.Builder schemaBuilder) {
+
+        formatOptions.getOptional(TextFormatOptions.CHARSET)
+                .ifPresent(schemaBuilder::setCharset);
+
+        formatOptions.getOptional(TextFormatOptions.KV_ENTRY_DELIMITER)
+                .map(delimiter -> 
StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+                .ifPresent(schemaBuilder::setEntryDelimiter);
+
+        formatOptions.getOptional(TextFormatOptions.KV_DELIMITER)
+                .map(delimiter -> 
StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+                .ifPresent(schemaBuilder::setKvDelimiter);
+
+        formatOptions
+                .getOptional(ESCAPE_CHARACTER)
+                .map(escape -> escape.charAt(0))
+                .ifPresent(schemaBuilder::setEscapeCharacter);
+
+        formatOptions
+                .getOptional(QUOTE_CHARACTER)
+                .map(quote -> quote.charAt(0))
+                .ifPresent(schemaBuilder::setQuoteCharacter);
+
+        formatOptions.getOptional(NULL_LITERAL)
+                .ifPresent(schemaBuilder::setNullLiteral);
+
+        formatOptions.getOptional(IGNORE_ERRORS)
+                .ifPresent(schemaBuilder::setIgnoreErrors);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
new file mode 100644
index 0000000000..d0bb4f1735
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.kv;
+
+import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ESCAPE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_IGNORE_ERRORS;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
+
+/**
+ * Deserialization schema from KV string bytes to Flink Table & SQL internal 
data structures.
+ *
+ * <p>Deserializes a <code>byte[]</code> messages as a Map and converts it to 
a {@link RowData}.</p>
+ *
+ * <p>Failure during deserialization are forwarded as wrapped {@link 
IOException}.</p>
+ */
+public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Nonnull
+    private final RowFormatInfo rowFormatInfo;
+
+    @Nonnull
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    @Nonnull
+    private final String charset;
+
+    @Nonnull
+    private final Character entryDelimiter;
+
+    @Nonnull
+    private final Character kvDelimiter;
+
+    @Nullable
+    private final Character escapeChar;
+
+    @Nullable
+    private final Character quoteChar;
+
+    @Nullable
+    private final String nullLiteral;
+
+    private final FieldToRowDataConverters.FieldToRowDataConverter[] 
converters;
+
+    public KvRowDataDeserializationSchema(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nonnull TypeInformation<RowData> producedTypeInfo) {
+        this(
+                rowFormatInfo,
+                producedTypeInfo,
+                DEFAULT_CHARSET,
+                DEFAULT_ENTRY_DELIMITER,
+                DEFAULT_KV_DELIMITER,
+                DEFAULT_ESCAPE_CHARACTER,
+                DEFAULT_QUOTE_CHARACTER,
+                DEFAULT_NULL_LITERAL);
+    }
+
+    public KvRowDataDeserializationSchema(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nonnull TypeInformation<RowData> producedTypeInfo,
+            @Nonnull String charset,
+            @Nonnull Character entryDelimiter,
+            @Nonnull Character kvDelimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable String nullLiteral) {
+        this(
+                rowFormatInfo,
+                producedTypeInfo,
+                charset,
+                entryDelimiter,
+                kvDelimiter,
+                escapeChar,
+                quoteChar,
+                nullLiteral,
+                DEFAULT_IGNORE_ERRORS);
+    }
+
+    public KvRowDataDeserializationSchema(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nonnull TypeInformation<RowData> producedTypeInfo,
+            @Nonnull String charset,
+            @Nonnull Character entryDelimiter,
+            @Nonnull Character kvDelimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable String nullLiteral,
+            @Nullable boolean ignoreErrors) {
+        super(ignoreErrors);
+        this.rowFormatInfo = rowFormatInfo;
+        this.producedTypeInfo = producedTypeInfo;
+        this.charset = charset;
+        this.entryDelimiter = entryDelimiter;
+        this.kvDelimiter = kvDelimiter;
+        this.escapeChar = escapeChar;
+        this.quoteChar = quoteChar;
+        this.nullLiteral = nullLiteral;
+
+        converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
+                .map(formatInfo -> FieldToRowDataConverters.createConverter(
+                        
TableFormatForRowDataUtils.deriveLogicalType(formatInfo)))
+                
.toArray(FieldToRowDataConverters.FieldToRowDataConverter[]::new);
+    }
+
+    @Override
+    public RowData deserializeInternal(byte[] bytes) throws IOException {
+        String text = new String(bytes, Charset.forName(charset));
+        try {
+            Map<String, String> fieldTexts =
+                    splitKv(text, entryDelimiter, kvDelimiter, escapeChar, 
quoteChar);
+
+            String[] fieldNames = rowFormatInfo.getFieldNames();
+            FormatInfo[] fieldFormatInfos = 
rowFormatInfo.getFieldFormatInfos();
+
+            GenericRowData rowData = new 
GenericRowData(fieldFormatInfos.length);
+            for (int i = 0; i < fieldFormatInfos.length; i++) {
+                String fieldName = fieldNames[i];
+                FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+
+                String fieldText = fieldTexts.get(fieldName);
+
+                Object field = deserializeBasicField(
+                        fieldName,
+                        fieldFormatInfo,
+                        fieldText,
+                        nullLiteral);
+                rowData.setField(i, converters[i].convert(field));
+            }
+            return rowData;
+        } catch (Throwable t) {
+            throw new IOException(
+                    String.format("Could not properly deserialize kv. 
Text=[%s].", text), t);
+        }
+    }
+
+    @Override
+    public boolean isEndOfStream(RowData rowData) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedTypeInfo;
+    }
+
+    /**
+     * Builder for {@link KvRowDataDeserializationSchema}.
+     */
+    public static class Builder extends KvFormatBuilder<Builder> {
+
+        private final TypeInformation<RowData> producedTypeInfo;
+        public Builder(RowFormatInfo rowFormatInfo, TypeInformation<RowData> 
producedTypeInfo) {
+            super(rowFormatInfo);
+            this.producedTypeInfo = producedTypeInfo;
+        }
+
+        public KvRowDataDeserializationSchema build() {
+            return new KvRowDataDeserializationSchema(
+                    rowFormatInfo,
+                    producedTypeInfo,
+                    charset,
+                    entryDelimiter,
+                    kvDelimiter,
+                    escapeChar,
+                    quoteChar,
+                    nullLiteral);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        KvRowDataDeserializationSchema that = (KvRowDataDeserializationSchema) 
o;
+        return rowFormatInfo.equals(that.rowFormatInfo) &&
+                charset.equals(that.charset) &&
+                entryDelimiter.equals(that.entryDelimiter) &&
+                kvDelimiter.equals(that.kvDelimiter) &&
+                Objects.equals(escapeChar, that.escapeChar) &&
+                Objects.equals(quoteChar, that.quoteChar) &&
+                Objects.equals(nullLiteral, that.nullLiteral);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                super.hashCode(),
+                rowFormatInfo,
+                charset,
+                entryDelimiter,
+                kvDelimiter,
+                escapeChar,
+                quoteChar,
+                nullLiteral);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataSerializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataSerializationSchema.java
new file mode 100644
index 0000000000..2b684bbe7f
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataSerializationSchema.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.kv;
+
+import org.apache.inlong.sort.formats.base.DefaultSerializationSchema;
+import org.apache.inlong.sort.formats.base.RowDataToFieldConverters;
+import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+
+import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.charset.Charset;
+import java.util.Objects;
+
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ESCAPE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_IGNORE_ERRORS;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
+import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.serializeBasicField;
+import static org.apache.inlong.sort.formats.util.StringUtils.concatKv;
+
+/**
+ * Serialization schema that serializes an object of Flink Table & SQL 
internal data structure into
+ * a KV format bytes.
+ *
+ * <p> Serialize the input row into a string in a form of "k1=v1&k2=v2" and 
converts it into
+ * bytes.</p>
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using
+ * {@link KvRowDataDeserializationSchema}.</p>
+ */
+public class KvRowDataSerializationSchema extends 
DefaultSerializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvRowDataSerializationSchema.class);
+
+    @Nonnull
+    private final RowFormatInfo rowFormatInfo;
+
+    @Nonnull
+    private final String charset;
+
+    @Nonnull
+    private final Character entryDelimiter;
+
+    @Nonnull
+    private final Character kvDelimiter;
+
+    @Nullable
+    private final Character escapeChar;
+
+    @Nullable
+    private final Character quoteChar;
+
+    @Nullable
+    private final String nullLiteral;
+
+    private final RowDataToFieldConverters.RowFieldConverter[] 
rowFieldConverters;
+
+    public KvRowDataSerializationSchema(
+            @Nonnull RowFormatInfo rowFormatInfo) {
+        this(
+                rowFormatInfo,
+                DEFAULT_CHARSET,
+                DEFAULT_ENTRY_DELIMITER,
+                DEFAULT_KV_DELIMITER,
+                DEFAULT_ESCAPE_CHARACTER,
+                DEFAULT_QUOTE_CHARACTER,
+                DEFAULT_NULL_LITERAL);
+    }
+
+    public KvRowDataSerializationSchema(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nonnull String charset,
+            @Nonnull Character entryDelimiter,
+            @Nonnull Character kvDelimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable String nullLiteral) {
+        this(rowFormatInfo, charset, entryDelimiter, kvDelimiter, escapeChar, 
quoteChar, nullLiteral,
+                DEFAULT_IGNORE_ERRORS);
+    }
+
+    public KvRowDataSerializationSchema(
+            @Nonnull RowFormatInfo rowFormatInfo,
+            @Nonnull String charset,
+            @Nonnull Character entryDelimiter,
+            @Nonnull Character kvDelimiter,
+            @Nullable Character escapeChar,
+            @Nullable Character quoteChar,
+            @Nullable String nullLiteral,
+            @Nullable boolean ignoreErrors) {
+        super(ignoreErrors);
+        this.rowFormatInfo = rowFormatInfo;
+        FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
+        rowFieldConverters = new 
RowDataToFieldConverters.RowFieldConverter[fieldFormatInfos.length];
+        for (int i = 0; i < rowFieldConverters.length; i++) {
+            rowFieldConverters[i] = 
RowDataToFieldConverters.createNullableRowFieldConverter(
+                    
TableFormatForRowDataUtils.deriveLogicalType(fieldFormatInfos[i]));
+        }
+
+        this.charset = charset;
+        this.entryDelimiter = entryDelimiter;
+        this.kvDelimiter = kvDelimiter;
+        this.escapeChar = escapeChar;
+        this.quoteChar = quoteChar;
+        this.nullLiteral = nullLiteral;
+    }
+
+    @Override
+    public byte[] serializeInternal(RowData rowData) {
+        if (rowData == null) {
+            return null;
+        }
+
+        try {
+            String[] fieldNames = rowFormatInfo.getFieldNames();
+            FormatInfo[] fieldFormatInfos = 
rowFormatInfo.getFieldFormatInfos();
+
+            if (rowData.getArity() != fieldFormatInfos.length) {
+                LOG.warn("The number of fields mismatches: expected=[{}], 
actual=[{}]. Row=[{}].",
+                        fieldNames.length, rowData.getArity(), rowData);
+            }
+
+            String[] fieldTexts = new String[fieldNames.length];
+
+            // the extra fields will be dropped
+            for (int i = 0; i < fieldNames.length; i++) {
+                if (i >= rowData.getArity()) {
+                    fieldTexts[i] = nullLiteral == null ? "" : nullLiteral;
+                } else {
+                    Object field = rowFieldConverters[i].convert(rowData, i);
+                    String fieldText = serializeBasicField(
+                            fieldNames[i],
+                            fieldFormatInfos[i],
+                            field,
+                            nullLiteral);
+
+                    fieldTexts[i] = fieldText;
+                }
+            }
+
+            String text = concatKv(
+                    fieldNames,
+                    fieldTexts,
+                    entryDelimiter,
+                    kvDelimiter,
+                    escapeChar,
+                    quoteChar);
+
+            return text.getBytes(Charset.forName(charset));
+        } catch (Throwable t) {
+            throw new RuntimeException(
+                    String.format("Could not properly serialize kv. Row=[%s]. 
FormatInfo=[%s].",
+                            rowData, rowFormatInfo),
+                    t);
+        }
+    }
+
+    /**
+     * Builder for {@link KvRowDataSerializationSchema}.
+     */
+    public static class Builder extends KvFormatBuilder<Builder> {
+
+        public Builder(RowFormatInfo rowFormatInfo) {
+            super(rowFormatInfo);
+        }
+
+        public KvRowDataSerializationSchema build() {
+            return new KvRowDataSerializationSchema(
+                    rowFormatInfo,
+                    charset,
+                    entryDelimiter,
+                    kvDelimiter,
+                    escapeChar,
+                    quoteChar,
+                    nullLiteral,
+                    ignoreErrors);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        if (!super.equals(o)) {
+            return false;
+        }
+
+        KvRowDataSerializationSchema that = (KvRowDataSerializationSchema) o;
+        return Objects.equals(rowFormatInfo, that.rowFormatInfo) &&
+                Objects.equals(charset, that.charset) &&
+                Objects.equals(entryDelimiter, that.entryDelimiter) &&
+                Objects.equals(kvDelimiter, that.kvDelimiter) &&
+                Objects.equals(escapeChar, that.escapeChar) &&
+                Objects.equals(quoteChar, that.quoteChar) &&
+                Objects.equals(nullLiteral, that.nullLiteral);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), rowFormatInfo, charset, 
entryDelimiter, kvDelimiter,
+                escapeChar, quoteChar, nullLiteral);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..b72e47ef5f
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.kv.KvFormatFactory
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvFormatFactoryTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvFormatFactoryTest.java
new file mode 100644
index 0000000000..13bc3a2c5f
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvFormatFactoryTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.kv;
+
+import org.apache.inlong.sort.formats.base.TableFormatConstants;
+import org.apache.inlong.sort.formats.base.TableFormatOptions;
+import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.FormatUtils;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import org.apache.flink.table.factories.utils.FactoryMocks;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.table.factories.utils.FactoryMocks.PHYSICAL_DATA_TYPE;
+import static org.apache.flink.table.factories.utils.FactoryMocks.SCHEMA;
+import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for the {@link KvFormatFactory}. */
+public class KvFormatFactoryTest {
+
+    private static final RowFormatInfo TEST_FORMAT_SCHEMA =
+            new RowFormatInfo(
+                    new String[]{"a", "b", "c"},
+                    new FormatInfo[]{
+                            StringFormatInfo.INSTANCE,
+                            IntFormatInfo.INSTANCE,
+                            BooleanFormatInfo.INSTANCE
+                    });
+
+    @Test
+    public void testSerDeSchema() {
+        final Map<String, String> tableOptions = getAllOptions();
+        testSerializationSchema(tableOptions);
+        testDeserializationSchema(tableOptions);
+    }
+
+    private Map<String, String> getModifiedOptions(
+            Consumer<Map<String, String>> optionModifier) {
+        Map<String, String> options = getAllOptions();
+        optionModifier.accept(options);
+        return options;
+    }
+
+    private void testDeserializationSchema(Map<String, String> tableOptions) {
+        LogicalType rowType = 
TableFormatUtils.deriveLogicalType(TEST_FORMAT_SCHEMA);
+        TypeInformation<RowData> producedType = InternalTypeInfo.of(rowType);
+        final KvRowDataDeserializationSchema expectedDeser =
+                new KvRowDataDeserializationSchema.Builder(TEST_FORMAT_SCHEMA, 
producedType)
+                        .setEntryDelimiter('&')
+                        .setKvDelimiter('=')
+                        .setCharset(StandardCharsets.ISO_8859_1.name())
+                        .setEscapeCharacter('\\')
+                        .setQuoteCharacter('\"')
+                        .setNullLiteral("n/a")
+                        .setIgnoreErrors(true)
+                        .build();
+
+        DeserializationSchema<RowData> actualDeser = 
createDeserializationSchema(tableOptions);
+        assertEquals(expectedDeser, actualDeser);
+    }
+
+    private void testSerializationSchema(Map<String, String> tableOptions) {
+        final KvRowDataSerializationSchema expectedSer =
+                new KvRowDataSerializationSchema.Builder(TEST_FORMAT_SCHEMA)
+                        .setEntryDelimiter('&')
+                        .setKvDelimiter('=')
+                        .setCharset(StandardCharsets.ISO_8859_1.name())
+                        .setEscapeCharacter('\\')
+                        .setQuoteCharacter('\"')
+                        .setNullLiteral("n/a")
+                        .setIgnoreErrors(true)
+                        .build();
+        SerializationSchema<RowData> actualSer = 
createSerializationSchema(tableOptions);
+        assertEquals(expectedSer, actualSer);
+    }
+
+    private DeserializationSchema<RowData> createDeserializationSchema(
+            Map<String, String> options) {
+        final DynamicTableSource actualSource = 
createTableSource(FactoryMocks.SCHEMA, options);
+        assert actualSource instanceof 
TestDynamicTableFactory.DynamicTableSourceMock;
+        TestDynamicTableFactory.DynamicTableSourceMock sourceMock =
+                (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+        return sourceMock.valueFormat.createRuntimeDecoder(
+                ScanRuntimeProviderContext.INSTANCE, PHYSICAL_DATA_TYPE);
+    }
+
+    private SerializationSchema<RowData> createSerializationSchema(Map<String, 
String> options) {
+        final DynamicTableSink actualSink = createTableSink(SCHEMA, options);
+        assert actualSink instanceof 
TestDynamicTableFactory.DynamicTableSinkMock;
+        TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+                (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+        return sinkMock.valueFormat.createRuntimeEncoder(null, 
PHYSICAL_DATA_TYPE);
+    }
+
+    private Map<String, String> getAllOptions() {
+        final Map<String, String> options = new HashMap<>();
+        options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+        options.put("target", "MyTarget");
+        options.put("buffer-size", "1000");
+
+        options.put("format", KvFormatFactory.IDENTIFIER);
+        options.put(KvFormatFactory.KV_PREFIX + 
TableFormatOptions.ROW_FORMAT_INFO.key(),
+                FormatUtils.marshall(TEST_FORMAT_SCHEMA));
+        options.put(KvFormatFactory.KV_PREFIX + 
TableFormatConstants.FORMAT_KV_ENTRY_DELIMITER, "&");
+        options.put(KvFormatFactory.KV_PREFIX + 
TableFormatConstants.FORMAT_KV_DELIMITER, "=");
+        options.put(KvFormatFactory.KV_PREFIX + 
TableFormatConstants.FORMAT_CHARSET, "ISO-8859-1");
+        options.put(KvFormatFactory.KV_PREFIX + 
TableFormatConstants.FORMAT_IGNORE_ERRORS, "true");
+        options.put(KvFormatFactory.KV_PREFIX + 
TableFormatConstants.FORMAT_ESCAPE_CHARACTER, "\\");
+        options.put(KvFormatFactory.KV_PREFIX + 
TableFormatConstants.FORMAT_QUOTE_CHARACTER, "\"");
+        options.put(KvFormatFactory.KV_PREFIX + 
TableFormatConstants.FORMAT_NULL_LITERAL, "n/a");
+
+        return options;
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java
new file mode 100644
index 0000000000..a0350905e3
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.kv;
+
+import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.common.BasicFormatInfo;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.ByteFormatInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.DecimalFormatInfo;
+import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.ShortFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link KvRowDataDeserializationSchema}.
+ */
+public class KvRowDataDeserializationSchemaTest {
+
+    private static final RowFormatInfo TEST_ROW_INFO =
+            new RowFormatInfo(
+                    new String[]{"f1", "f2", "f3", "f4"},
+                    new FormatInfo[]{
+                            IntFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE
+                    });
+
+    @Test
+    public void testNormal() throws Exception {
+        Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> {
+        };
+
+        testBasicDeserialization(config, StringFormatInfo.INSTANCE, 
StringData.fromString("hello"), "f=hello");
+        testBasicDeserialization(config, BooleanFormatInfo.INSTANCE, true, 
"f=true");
+        testBasicDeserialization(config, ByteFormatInfo.INSTANCE, (byte) 124, 
"f=124");
+        testBasicDeserialization(config, ShortFormatInfo.INSTANCE, (short) 
10000, "f=10000");
+        testBasicDeserialization(config, IntFormatInfo.INSTANCE, 1234567, 
"f=1234567");
+        testBasicDeserialization(config, LongFormatInfo.INSTANCE, 
12345678910L, "f=12345678910");
+        testBasicDeserialization(config, FloatFormatInfo.INSTANCE, 
0.33333334f, "f=0.33333334");
+        testBasicDeserialization(config, DoubleFormatInfo.INSTANCE, 
0.33333333332, "f=0.33333333332");
+        testBasicDeserialization(config, DecimalFormatInfo.INSTANCE,
+                DecimalData.fromBigDecimal(new 
BigDecimal("1234.0000000000000000000000001"), 10, 0),
+                "f=1234.0000000000000000000000001");
+        testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"),
+                Date.valueOf("2020-03-22").toLocalDate().toEpochDay(), 
"f=22/03/2020");
+        testBasicDeserialization(config, new TimeFormatInfo("ss/mm/hh"),
+                Time.valueOf("11:12:13").toLocalTime().toSecondOfDay() * 1000, 
"f=13/12/11");
+        testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy 
hh:mm:ss"),
+                TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 
11:12:13")), "f=22/03/2020 11:12:13");
+    }
+
+    @Test
+    public void testNullIteral() throws Exception {
+        String nullLiteral = "n/a";
+        String nullField = "f=n/a";
+        Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> 
builder.setNullLiteral(nullLiteral);
+
+        testBasicDeserialization(config, StringFormatInfo.INSTANCE, null, 
nullField);
+        testBasicDeserialization(config, BooleanFormatInfo.INSTANCE, null, 
nullField);
+        testBasicDeserialization(config, ByteFormatInfo.INSTANCE, null, 
nullField);
+        testBasicDeserialization(config, ShortFormatInfo.INSTANCE, null, 
nullField);
+        testBasicDeserialization(config, IntFormatInfo.INSTANCE, null, 
nullField);
+        testBasicDeserialization(config, LongFormatInfo.INSTANCE, null, 
nullField);
+        testBasicDeserialization(config, FloatFormatInfo.INSTANCE, null, 
nullField);
+        testBasicDeserialization(config, DoubleFormatInfo.INSTANCE, null, 
nullField);
+        testBasicDeserialization(config, DecimalFormatInfo.INSTANCE, null, 
nullField);
+        testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"), 
null, nullField);
+        testBasicDeserialization(config, new TimeFormatInfo("ss/mm/hh"), null, 
nullField);
+        testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy 
hh:mm:ss"), null, nullField);
+    }
+
+    @Test
+    public void testDelimiter() throws Exception {
+        Consumer<KvRowDataDeserializationSchema.Builder> config =
+                builder -> builder.setEntryDelimiter('|').setKvDelimiter(',');
+
+        GenericRowData rowData = new GenericRowData(4);
+        rowData.setField(0, 10);
+        rowData.setField(1, StringData.fromString("aa"));
+        rowData.setField(2, StringData.fromString("bb"));
+        rowData.setField(3, StringData.fromString("cc"));
+
+        testRowDeserialization(
+                config,
+                rowData,
+                "f1,10|f2,aa|f3,bb|f4,cc".getBytes());
+    }
+
+    @Test
+    public void testEscape() throws Exception {
+        Consumer<KvRowDataDeserializationSchema.Builder> config =
+                builder -> 
builder.setEscapeCharacter('\\').setQuoteCharacter('\"');
+
+        GenericRowData rowData1 = new GenericRowData(4);
+        rowData1.setField(0, 10);
+        rowData1.setField(1, StringData.fromString("field1&field2"));
+        rowData1.setField(2, StringData.fromString("field3"));
+        rowData1.setField(3, StringData.fromString("field4"));
+
+        testRowDeserialization(
+                config,
+                rowData1,
+                "f1=10&f2=field1\\&field2&f3=field3&f4=field4".getBytes());
+
+        GenericRowData rowData2 = new GenericRowData(4);
+        rowData2.setField(0, 10);
+        rowData2.setField(1, StringData.fromString("field1\\"));
+        rowData2.setField(2, StringData.fromString("field2"));
+        rowData2.setField(3, StringData.fromString("field3"));
+
+        testRowDeserialization(
+                config,
+                rowData2,
+                "f1=10&f2=field1\\\\&f3=field2&f4=field3".getBytes());
+
+        GenericRowData rowData3 = new GenericRowData(4);
+        rowData3.setField(0, 10);
+        rowData3.setField(1, StringData.fromString("field1\""));
+        rowData3.setField(2, StringData.fromString("field2"));
+        rowData3.setField(3, StringData.fromString("field3"));
+
+        testRowDeserialization(
+                config,
+                rowData3,
+                "f1=10&f2=field1\\\"&f3=field2&f4=field3".getBytes());
+    }
+
+    @Test
+    public void testQuote() throws Exception {
+        Consumer<KvRowDataDeserializationSchema.Builder> config =
+                builder -> 
builder.setEscapeCharacter('\\').setQuoteCharacter('\"');
+
+        GenericRowData rowData1 = new GenericRowData(4);
+        rowData1.setField(0, 10);
+        rowData1.setField(1, StringData.fromString("field1&field2"));
+        rowData1.setField(2, StringData.fromString("field3"));
+        rowData1.setField(3, StringData.fromString("field4"));
+
+        testRowDeserialization(
+                config,
+                rowData1,
+                "f1=10&f2=\"field1&field2\"&f3=field3&f4=field4".getBytes());
+
+        GenericRowData rowData2 = new GenericRowData(4);
+        rowData2.setField(0, 10);
+        rowData2.setField(1, StringData.fromString("field1\\"));
+        rowData2.setField(2, StringData.fromString("field2"));
+        rowData2.setField(3, StringData.fromString("field3"));
+
+        testRowDeserialization(
+                config,
+                rowData2,
+                "f1=10&f2=\"field1\\\"&f3=field2&f4=field3".getBytes());
+    }
+
+    @Test
+    public void testExtractSpecificKeys() throws Exception {
+        Consumer<KvRowDataDeserializationSchema.Builder> config =
+                builder -> 
builder.setEscapeCharacter('\\').setQuoteCharacter('\"');
+
+        GenericRowData rowData = new GenericRowData(4);
+        rowData.setField(0, 10);
+        rowData.setField(1, StringData.fromString("field1"));
+        rowData.setField(2, StringData.fromString("field2"));
+        rowData.setField(3, StringData.fromString("field3"));
+
+        testRowDeserialization(
+                config,
+                rowData,
+                "f1=10&f2=field1&f3=field2&f4=field3&f5=field4".getBytes());
+    }
+
+    @Test
+    public void testCharset() throws Exception {
+        Consumer<KvRowDataDeserializationSchema.Builder> config =
+                builder -> builder.setCharset(StandardCharsets.UTF_16.name());
+
+        GenericRowData rowData = new GenericRowData(4);
+        rowData.setField(0, 10);
+        rowData.setField(1, StringData.fromString("aa"));
+        rowData.setField(2, StringData.fromString("bb"));
+        rowData.setField(3, StringData.fromString("cc"));
+
+        testRowDeserialization(
+                config,
+                rowData,
+                "f1=10&f2=aa&f3=bb&f4=cc".getBytes(StandardCharsets.UTF_16));
+    }
+
+    @Test(expected = Exception.class)
+    public void testErrors() throws Exception {
+        Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> {
+        };
+
+        GenericRowData rowData = new GenericRowData(4);
+        rowData.setField(0, null);
+        rowData.setField(1, StringData.fromString("field1"));
+        rowData.setField(2, StringData.fromString("field2"));
+        rowData.setField(3, StringData.fromString("field3"));
+
+        testRowDeserialization(
+                config,
+                rowData,
+                "f1=na&f2=field1&f3=field2&f4=field3".getBytes());
+    }
+
+    @Test
+    public void testMissingField() throws Exception {
+        Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> {
+        };
+
+        GenericRowData rowData = new GenericRowData(4);
+        rowData.setField(0, null);
+        rowData.setField(1, StringData.fromString("aa"));
+        rowData.setField(2, StringData.fromString("bb"));
+        rowData.setField(3, StringData.fromString("cc"));
+
+        testRowDeserialization(
+                config,
+                rowData,
+                "f2=aa&f3=bb&f4=cc".getBytes());
+    }
+
+    @Test
+    public void testExtraField() throws Exception {
+        Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> {
+        };
+
+        GenericRowData rowData = new GenericRowData(4);
+        rowData.setField(0, 10);
+        rowData.setField(1, StringData.fromString("aa"));
+        rowData.setField(2, StringData.fromString("bb"));
+        rowData.setField(3, StringData.fromString("cc"));
+
+        testRowDeserialization(
+                config,
+                rowData,
+                "f1=10&f2=aa&f3=bb&f4=cc&f5=dd".getBytes());
+    }
+
+    private static <T> void testBasicDeserialization(
+            Consumer<KvRowDataDeserializationSchema.Builder> config,
+            BasicFormatInfo<T> basicFormatInfo,
+            Object expectedRecord,
+            String text) throws IOException {
+        RowFormatInfo rowFormatInfo =
+                new RowFormatInfo(
+                        new String[]{"f"},
+                        new FormatInfo[]{basicFormatInfo});
+        LogicalType logicalType = 
TableFormatUtils.deriveLogicalType(basicFormatInfo);
+        TypeInformation<RowData> typeInformation = 
InternalTypeInfo.of(logicalType);
+        KvRowDataDeserializationSchema.Builder builder =
+                new KvRowDataDeserializationSchema.Builder(rowFormatInfo, 
typeInformation);
+        config.accept(builder);
+
+        KvRowDataDeserializationSchema deserializer = builder.build();
+
+        GenericRowData row = (GenericRowData) 
deserializer.deserialize(text.getBytes());
+        assertEquals(1, row.getArity());
+        assertEquals(expectedRecord, row.getField(0));
+    }
+
+    private void testRowDeserialization(
+            Consumer<KvRowDataDeserializationSchema.Builder> config,
+            RowData expectedRow,
+            byte[] bytes) throws Exception {
+        LogicalType rowType = 
TableFormatUtils.deriveLogicalType(TEST_ROW_INFO);
+        TypeInformation<RowData> producedTypeInfo = 
InternalTypeInfo.of(rowType);
+        KvRowDataDeserializationSchema.Builder builder =
+                new KvRowDataDeserializationSchema.Builder(TEST_ROW_INFO, 
producedTypeInfo);
+        config.accept(builder);
+
+        KvRowDataDeserializationSchema deserializer = builder.build();
+
+        RowData row = deserializer.deserialize(bytes);
+        assertEquals(expectedRow, row);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataSerializationSchemaTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataSerializationSchemaTest.java
new file mode 100644
index 0000000000..02616b61a9
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataSerializationSchemaTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.kv;
+
+import org.apache.inlong.sort.formats.common.BasicFormatInfo;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.ByteFormatInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.DecimalFormatInfo;
+import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.ShortFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.function.Consumer;
+
+import static org.apache.flink.table.data.DecimalData.fromBigDecimal;
+import static org.apache.flink.table.data.StringData.fromString;
+import static org.apache.flink.table.data.TimestampData.fromTimestamp;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link KvRowDataSerializationSchema}.
+ */
+public class KvRowDataSerializationSchemaTest {
+
+    private static final RowFormatInfo TEST_ROW_INFO =
+            new RowFormatInfo(
+                    new String[]{"f1", "f2", "f3", "f4"},
+                    new FormatInfo[]{
+                            IntFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE,
+                            StringFormatInfo.INSTANCE
+                    });
+
+    @Test
+    public void testNormal() {
+        Consumer<KvRowDataSerializationSchema.Builder> config = builder -> {
+        };
+
+        testBasicSerialization(config, StringFormatInfo.INSTANCE, "hello", 
"f=hello");
+        testBasicSerialization(config, BooleanFormatInfo.INSTANCE, true, 
"f=true");
+        testBasicSerialization(config, ByteFormatInfo.INSTANCE, (byte) 124, 
"f=124");
+        testBasicSerialization(config, ShortFormatInfo.INSTANCE, (short) 
10000, "f=10000");
+        testBasicSerialization(config, IntFormatInfo.INSTANCE, 1234567, 
"f=1234567");
+        testBasicSerialization(config, LongFormatInfo.INSTANCE, 12345678910L, 
"f=12345678910");
+        testBasicSerialization(config, FloatFormatInfo.INSTANCE, 0.33333334f, 
"f=0.33333334");
+        testBasicSerialization(config, DoubleFormatInfo.INSTANCE, 
0.33333333332, "f=0.33333333332");
+        testBasicSerialization(config, DecimalFormatInfo.INSTANCE, new 
BigDecimal("1234.0000000000000000000000001"),
+                "f=1234.0000000000000000000000001");
+        testBasicSerialization(config, new DateFormatInfo("dd/MM/yyyy"), 
Date.valueOf("2020-03-22"), "f=22/03/2020");
+        testBasicSerialization(config, new TimeFormatInfo("ss/mm/hh"), 
Time.valueOf("11:12:13"), "f=13/12/11");
+        testBasicSerialization(config, new TimestampFormatInfo("dd/MM/yyyy 
hh:mm:ss"),
+                Timestamp.valueOf("2020-03-22 11:12:13"), "f=22/03/2020 
11:12:13");
+    }
+
+    @Test
+    public void testNullIteral() {
+        String nullLiteral = "n/a";
+        String nullField = "f=n/a";
+        Consumer<KvRowDataSerializationSchema.Builder> config = builder -> 
builder.setNullLiteral(nullLiteral);
+
+        testBasicSerialization(config, StringFormatInfo.INSTANCE, null, 
nullField);
+        testBasicSerialization(config, BooleanFormatInfo.INSTANCE, null, 
nullField);
+        testBasicSerialization(config, ByteFormatInfo.INSTANCE, null, 
nullField);
+        testBasicSerialization(config, ShortFormatInfo.INSTANCE, null, 
nullField);
+        testBasicSerialization(config, IntFormatInfo.INSTANCE, null, 
nullField);
+        testBasicSerialization(config, LongFormatInfo.INSTANCE, null, 
nullField);
+        testBasicSerialization(config, FloatFormatInfo.INSTANCE, null, 
nullField);
+        testBasicSerialization(config, DoubleFormatInfo.INSTANCE, null, 
nullField);
+        testBasicSerialization(config, DecimalFormatInfo.INSTANCE, null, 
nullField);
+        testBasicSerialization(config, new DateFormatInfo("dd/MM/yyyy"), null, 
nullField);
+        testBasicSerialization(config, new TimeFormatInfo("ss/mm/hh"), null, 
nullField);
+        testBasicSerialization(config, new TimestampFormatInfo("dd/MM/yyyy 
hh:mm:ss"), null, nullField);
+    }
+
+    @Test
+    public void testDelimiter() {
+        Consumer<KvRowDataSerializationSchema.Builder> config =
+                builder -> builder.setEntryDelimiter('|').setKvDelimiter(',');
+
+        testRowSerialization(
+                config,
+                GenericRowData.of(
+                        10,
+                        fromString("field1"),
+                        fromString("field2"),
+                        fromString("field3")),
+                "f1,10|f2,field1|f3,field2|f4,field3".getBytes());
+
+    }
+
+    @Test
+    public void testEscape() {
+        Consumer<KvRowDataSerializationSchema.Builder> config =
+                builder -> 
builder.setEscapeCharacter('\\').setQuoteCharacter('\"');
+
+        testRowSerialization(
+                config,
+                GenericRowData.of(
+                        10,
+                        fromString("field1&field2"),
+                        fromString("field3"),
+                        fromString("field4")),
+                "f1=10&f2=field1\\&field2&f3=field3&f4=field4".getBytes());
+        testRowSerialization(
+                config,
+                GenericRowData.of(
+                        10,
+                        fromString("field1\\"),
+                        fromString("field2"),
+                        fromString("field3")),
+                "f1=10&f2=field1\\\\&f3=field2&f4=field3".getBytes());
+        testRowSerialization(
+                config,
+                GenericRowData.of(
+                        10,
+                        fromString("field1\""),
+                        fromString("field2"),
+                        fromString("field3")),
+                "f1=10&f2=field1\\\"&f3=field2&f4=field3".getBytes());
+    }
+
+    @Test
+    public void testQuote() {
+        Consumer<KvRowDataSerializationSchema.Builder> config = builder -> 
builder.setQuoteCharacter('\"');
+
+        testRowSerialization(
+                config,
+                GenericRowData.of(
+                        10,
+                        fromString("field1&field2"),
+                        fromString("field3"),
+                        fromString("field4")),
+                "f1=10&f2=field1\"&\"field2&f3=field3&f4=field4".getBytes());
+    }
+
+    @Test
+    public void testCharset() {
+        Consumer<KvRowDataSerializationSchema.Builder> config =
+                builder -> builder.setCharset(StandardCharsets.UTF_16.name());
+
+        testRowSerialization(
+                config,
+                GenericRowData.of(
+                        10,
+                        fromString("field1"),
+                        fromString("field2"),
+                        fromString("field3")),
+                
"f1=10&f2=field1&f3=field2&f4=field3".getBytes(StandardCharsets.UTF_16));
+    }
+
+    @Test
+    public void testNullFields() {
+        Consumer<KvRowDataSerializationSchema.Builder> config = builder -> {
+        };
+
+        testRowSerialization(
+                config,
+                GenericRowData.of(
+                        10,
+                        fromString("field1"),
+                        null,
+                        fromString("field3")),
+                "f1=10&f2=field1&f3=&f4=field3".getBytes());
+    }
+
+    @Test
+    public void testMoreFields() {
+        Consumer<KvRowDataSerializationSchema.Builder> config = builder -> {
+        };
+
+        testRowSerialization(
+                config,
+                GenericRowData.of(10,
+                        fromString("field1"),
+                        fromString("field2"),
+                        fromString("field3"),
+                        fromString("field4")),
+                "f1=10&f2=field1&f3=field2&f4=field3".getBytes());
+    }
+
+    @Test
+    public void testLessFields() {
+        Consumer<KvRowDataSerializationSchema.Builder> config = builder -> {
+        };
+
+        testRowSerialization(
+                config,
+                GenericRowData.of(
+                        10,
+                        fromString("field1"),
+                        fromString("field2")),
+                "f1=10&f2=field1&f3=field2&f4=".getBytes());
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testErrors() {
+        Consumer<KvRowDataSerializationSchema.Builder> config = builder -> {
+        };
+        testRowSerialization(
+                config,
+                GenericRowData.of(
+                        fromString("na"),
+                        fromString("field1"),
+                        fromString("field2"),
+                        fromString("field3")),
+                "f1=&f2=field1&f3=field2&f4=field3".getBytes());
+    }
+
+    private static <T> void testBasicSerialization(
+            Consumer<KvRowDataSerializationSchema.Builder> config,
+            BasicFormatInfo<T> basicFormatInfo,
+            T record,
+            String expectedText) {
+        RowFormatInfo rowFormatInfo =
+                new RowFormatInfo(
+                        new String[]{"f"},
+                        new FormatInfo[]{basicFormatInfo});
+
+        KvRowDataSerializationSchema.Builder builder =
+                new KvRowDataSerializationSchema.Builder(rowFormatInfo);
+        config.accept(builder);
+
+        KvRowDataSerializationSchema serializer = builder.build();
+
+        GenericRowData rowData = new GenericRowData(1);
+        if (record instanceof String) {
+            rowData.setField(0, fromString((String) record));
+        } else if (record instanceof BigDecimal) {
+            rowData.setField(0, fromBigDecimal((BigDecimal) record, 30, 25));
+        } else if (record instanceof Timestamp) {
+            rowData.setField(0, fromTimestamp((Timestamp) record));
+        } else if (record instanceof Date) {
+            rowData.setField(0, ((Date) record).toLocalDate().toEpochDay());
+        } else if (record instanceof Time) {
+            rowData.setField(0, ((Time) record).toLocalTime().toSecondOfDay() 
* 1000);
+        } else {
+            rowData.setField(0, record);
+        }
+
+        String text = new String(serializer.serialize(rowData));
+        assertEquals(expectedText, text);
+    }
+
+    private static void testRowSerialization(
+            Consumer<KvRowDataSerializationSchema.Builder> config,
+            RowData row,
+            byte[] expectedBytes) {
+        KvRowDataSerializationSchema.Builder builder =
+                new KvRowDataSerializationSchema.Builder(TEST_ROW_INFO);
+        config.accept(builder);
+
+        KvRowDataSerializationSchema serializer = builder.build();
+        byte[] bytes = serializer.serialize(row);
+        assertArrayEquals(expectedBytes, bytes);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java
new file mode 100644
index 0000000000..4ad02f88e8
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.kv;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.inlong.sort.formats.util.StringUtils.concatKv;
+import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for kv splitting and concating.
+ */
+public class KvUtilsTest {
+
+    @Test
+    public void testSplitNormal() {
+        List<Map<String, String>> list =
+                splitKv("f1=a\nf1=b", '&', '=', '\\', '\"', '\n');
+        List<Map<String, String>> expectedList = new ArrayList<>();
+        expectedList.add(new HashMap<String, String>() {
+
+            {
+                put("f1", "a");
+            }
+        });
+        expectedList.add(new HashMap<String, String>() {
+
+            {
+                put("f1", "b");
+            }
+        });
+        assertEquals(list, expectedList);
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("f1", "a");
+                        put("f2", "b");
+                        put("f3", "c");
+                    }
+                },
+                splitKv("f1=a&f2=b&f3=c", '&', '=', null, null));
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("f1", "");
+                        put("f2", "b");
+                        put("f3", "c");
+                    }
+                },
+                splitKv("f1=&f2=b&f3=c", '&', '=', null, null));
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("f1", "a");
+                        put("f2", "b");
+                        put("f3", "");
+                    }
+                },
+                splitKv("f1=a&f2=b&f3=", '&', '=', null, null));
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("=f1", "a");
+                        put("f2", "b");
+                        put("f3", "c");
+                    }
+                },
+                splitKv("\\=f1=a&f2=b&f3=c", '&', '=', '\\', null));
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("&f1", "a");
+                        put("f2", "b");
+                        put("f3", "c");
+                    }
+                },
+                splitKv("\\&f1=a&f2=b&f3=c", '&', '=', '\\', null));
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("&f1", "a");
+                        put("f2", "b");
+                        put("f3", "c");
+                    }
+                },
+                splitKv("\"&f1\"=a&f2=b&f3=c", '&', '=', '\\', '\"'));
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("f1", "a&");
+                        put("f2", "b");
+                        put("f3", "c");
+                    }
+                },
+                splitKv("f1=a\\&&f2=b&f3=c", '&', '=', '\\', null));
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("f1", "a\\");
+                        put("f2", "b");
+                        put("f3", "c");
+                    }
+                },
+                splitKv("f1=a\\\\&f2=b&f3=c", '&', '=', '\\', null));
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("f1", "a&f2=b");
+                        put("f3", "c");
+                        put("f4", "d");
+                    }
+                },
+                splitKv("f1=a\"&f2=\"b&f3=c&f4=d", '&', '=', '\\', '\"'));
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("f1", "atest\\test");
+                        put("f2", "b");
+                        put("f3", "c");
+                    }
+                },
+                splitKv("f1=a\"test\\test\"&f2=b&f3=c", '&', '=', '\\', '\"'));
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("f1", "a");
+                        put("f2", "\"b");
+                        put("f3", "c\"");
+                        put("f4", "d");
+                    }
+                },
+                splitKv("f1=a&f2=\\\"b&f3=c\\\"&f4=d", '&', '=', '\\', '\"'));
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("f1", "b");
+                    }
+                },
+                splitKv("f1=a&f1=b", '&', '=', '\\', '\"'));
+
+        assertEquals(
+                new HashMap<String, String>() {
+
+                    {
+                        put("", "a");
+                        put("f", "");
+                    }
+                },
+                splitKv("=a&f=", '&', '=', '\\', '\"'));
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testSplitNestedValue() {
+        splitKv("f1=a=a&f2=b&f3=c", '&', '=', '\\', '\"');
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testSplitUnclosedEscaping() {
+        splitKv("f1=a&f2=b\\", '&', '=', '\\', '\"');
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testSplitUnclosedQuoting() {
+        splitKv("f1=a&f2=b\"", '&', '=', '\\', '\"');
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testSplitDanglingKey1() {
+        splitKv("f1", '&', '=', null, null);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testSplitDanglingKey2() {
+        splitKv("f1&f2=3", '&', '=', null, null);
+    }
+
+    @Test
+    public void testConcatNormal() {
+        assertEquals(
+                "f1=a&f2=b&f3=c&f4=d",
+                concatKv(
+                        new String[]{"f1", "f2", "f3", "f4"},
+                        new String[]{"a", "b", "c", "d"},
+                        '&', '=', null, null));
+
+        assertEquals(
+                "f1\\&=a&f2=\\&b&f3=c&f4=d",
+                concatKv(
+                        new String[]{"f1&", "f2", "f3", "f4"},
+                        new String[]{"a", "&b", "c", "d"},
+                        '&', '=', '\\', '\"'));
+
+        assertEquals(
+                "f1=a&f2=\\\\b&f3=c&f4=d",
+                concatKv(
+                        new String[]{"f1", "f2", "f3", "f4"},
+                        new String[]{"a", "\\b", "c", "d"},
+                        '&', '=', '\\', '\"'));
+
+        assertEquals(
+                "f1=a&f2=\\\"b&f3=c&f4=d",
+                concatKv(
+                        new String[]{"f1", "f2", "f3", "f4"},
+                        new String[]{"a", "\"b", "c", "d"},
+                        '&', '=', '\\', '\"'));
+
+        assertEquals(
+                "f1\"&\"=a&f2=\"&\"b&f3=c&f4=d",
+                concatKv(
+                        new String[]{"f1&", "f2", "f3", "f4"},
+                        new String[]{"a", "&b", "c", "d"},
+                        '&', '=', null, '\"'));
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testConcatNoEscapingAndQuoting() {
+        concatKv(
+                new String[]{"f1", "f2", "f3", "f4"},
+                new String[]{"&a", "&b", "&c", "&d"},
+                '&', '=', null, null);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testConcatNoEscaping() {
+        concatKv(
+                new String[]{"f1", "f2", "f3", "f4"},
+                new String[]{"a", "\"b", "c", "d"},
+                '&', '=', null, '\"');
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/resources/log4j-test.properties
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000000..881dc0609b
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/inlong-sort/sort-formats/format-rowdata/pom.xml 
b/inlong-sort/sort-formats/format-rowdata/pom.xml
index 0334976986..83ee0e56d4 100644
--- a/inlong-sort/sort-formats/format-rowdata/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/pom.xml
@@ -33,6 +33,8 @@
 
     <modules>
         <module>format-rowdata-base</module>
+        <module>format-rowdata-csv</module>
+        <module>format-rowdata-kv</module>
         <module>format-rowdata-json</module>
         <module>format-inlongmsg-rowdata-base</module>
         <module>format-inlongmsg-rowdata-binlog</module>


Reply via email to