This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a58040909 [INLONG-9791][Sort] Return null instead of throwing an 
exception when deserialization by type fails (#9792)
5a58040909 is described below

commit 5a58040909423aebd25a2bf40df80a23480bf047
Author: baomingyu <baomingy...@163.com>
AuthorDate: Tue Mar 12 11:22:30 2024 +0800

    [INLONG-9791][Sort] Return null instead of throwing an exception when 
deserialization by type fails (#9792)
---
 .../inlong/sort/formats/base/TableFormatUtils.java |  9 ++++--
 .../sort/formats/inlongmsg/FailureHandler.java     | 20 ++++++++++++
 .../formats/inlongmsg/IgnoreFailureHandler.java    | 10 ++++++
 .../sort/formats/inlongmsg/NoOpFailureHandler.java | 11 +++++++
 .../sort/formats/base/TableFormatUtilsTest.java    | 10 +++---
 .../formats/csv/CsvDeserializationSchemaTest.java  | 11 ++++---
 .../InLongMsgCsvFormatDeserializerTest.java        | 20 +++++++++---
 .../InLongMsgKvFormatDeserializerTest.java         |  2 +-
 .../InLongMsgTlogCsvFormatDeserializerTest.java    |  2 +-
 .../InLongMsgTlogKvFormatDeserializerTest.java     |  2 +-
 .../formats/kv/KvDeserializationSchemaTest.java    |  2 +-
 .../InLongMsgCsvFormatDeserializerTest.java        | 35 +++++++++++++-------
 .../InLongMsgKvFormatDeserializerTest.java         |  2 +-
 .../InLongMsgTlogCsvFormatDeserializerTest.java    |  2 +-
 .../InLongMsgTlogKvFormatDeserializerTest.java     |  2 +-
 .../formats/base/DefaultDeserializationSchema.java | 37 ++++++++++------------
 .../sort/formats/base/TableFormatUtilsTest.java    | 10 +++---
 .../csv/CsvRowDataDeserializationSchema.java       |  8 ++---
 .../csv/CsvRowDataDeserializationSchemaTest.java   |  9 ++++--
 .../json/JsonRowDataDeserializationSchema.java     | 12 +++++--
 .../formats/json/FieldToRowDataConvertersTest.java |  2 ++
 .../json/JsonRowDataDeserializationSchemaTest.java |  4 ---
 .../formats/json/JsonRowDataSerDeTestBase.java     |  2 ++
 .../formats/json/RowDataToFieldConvertersTest.java |  6 ++--
 .../formats/kv/KvRowDataDeserializationSchema.java | 10 +++---
 .../kv/KvRowDataDeserializationSchemaTest.java     |  2 +-
 26 files changed, 162 insertions(+), 80 deletions(-)

diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index d83108de39..430b89b32f 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -92,6 +92,8 @@ import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
@@ -111,6 +113,8 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SC
  */
 public class TableFormatUtils {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(TableFormatUtils.class);
+
     /**
      * Returns the {@link DeserializationSchema} described by the given
      * properties.
@@ -567,9 +571,10 @@ public class TableFormatUtils {
         try {
             return ((BasicFormatInfo<?>) 
fieldFormatInfo).deserialize(fieldText);
         } catch (Exception e) {
-            throw new RuntimeException("Could not properly deserialize the "
-                    + "text " + fieldText + " for field " + fieldName + ".", 
e);
+            LOG.warn("Could not properly deserialize the " + "text "
+                    + fieldText + " for field " + fieldName + ".", e);
         }
+        return null;
     }
 
     /**
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
index 7dff44fa10..6bdbf36103 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
@@ -24,6 +24,25 @@ import java.io.Serializable;
  */
 public interface FailureHandler extends Serializable {
 
+    /**
+     * This method is called when there is a failure occurred while parsing to 
check is
+     * or not parse failure.
+     *
+     */
+    default boolean isIgnoreFailure() {
+        return false;
+    };
+
+    /**
+     * This method is called when there is a failure occurred while parsing 
non-InLong message.
+     *
+     * @param msg the msg byte
+     * @param exception the thrown exception
+     * @throws Exception the exception
+     */
+    default void onParsingMsgFailure(Object msg, Exception exception) throws 
Exception {
+    };
+
     /**
      * This method is called when there is a failure occurred while parsing 
InLongMsg head.
      *
@@ -51,4 +70,5 @@ public interface FailureHandler extends Serializable {
      * @throws Exception the exception
      */
     void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body, 
Exception exception) throws Exception;
+
 }
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
index 6ac0b956f2..b27fefc891 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
@@ -27,6 +27,11 @@ public class IgnoreFailureHandler implements FailureHandler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IgnoreFailureHandler.class);
 
+    @Override
+    public void onParsingMsgFailure(Object msg, Exception exception) {
+        LOG.error("Could not properly deserialize msg=[{}].", msg, exception);
+    };
+
     @Override
     public void onParsingHeadFailure(String attribute, Exception exception) {
         LOG.warn("Cannot properly parse the head {}", attribute, exception);
@@ -42,6 +47,11 @@ public class IgnoreFailureHandler implements FailureHandler {
         LOG.warn("Cannot properly convert the InLongMsg ({}, {})", head, body, 
exception);
     }
 
+    @Override
+    public boolean isIgnoreFailure() {
+        return true;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
index 66eeb19e69..d049920be1 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
@@ -27,6 +27,12 @@ public class NoOpFailureHandler implements FailureHandler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NoOpFailureHandler.class);
 
+    @Override
+    public void onParsingMsgFailure(Object msg, Exception t) throws Exception {
+        LOG.error("Could not properly serialize msg=[{}].", msg, t);
+        throw t;
+    }
+
     @Override
     public void onParsingHeadFailure(String attribute, Exception exception) 
throws Exception {
         LOG.error("Cannot properly parse the head {}", attribute, exception);
@@ -45,6 +51,11 @@ public class NoOpFailureHandler implements FailureHandler {
         throw exception;
     }
 
+    @Override
+    public boolean isIgnoreFailure() {
+        return false;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
 
b/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
index 9aada97b24..189fda1e0e 100644
--- 
a/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
@@ -20,13 +20,13 @@ package org.apache.inlong.sort.formats.base;
 import org.apache.inlong.sort.formats.common.IntFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.serializeBasicField;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link TableFormatUtils}.
@@ -174,23 +174,23 @@ public class TableFormatUtilsTest {
         assertEquals(1, result1);
 
         try {
-            deserializeBasicField(
+            Object result2 = deserializeBasicField(
                     "f",
                     IntFormatInfo.INSTANCE,
                     "",
                     "n/a");
-            fail("The method is expected to throw an exception.");
+            Assert.assertEquals(null, result2);
         } catch (Exception e) {
             // ignored
         }
 
-        Object result2 =
+        Object result3 =
                 deserializeBasicField(
                         "f",
                         IntFormatInfo.INSTANCE,
                         "n/a",
                         "n/a");
-        assertNull(result2);
+        assertNull(result3);
     }
 
     @Test
diff --git 
a/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchemaTest.java
 
b/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchemaTest.java
index c7cf93fe9c..9ddc0c4a74 100644
--- 
a/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchemaTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchemaTest.java
@@ -75,9 +75,11 @@ public class CsvDeserializationSchemaTest {
         testBasicDeserialization(config, LongFormatInfo.INSTANCE, 
12345678910L, "12345678910");
         testBasicDeserialization(config, FloatFormatInfo.INSTANCE, 
0.33333334f, "0.33333334");
         testBasicDeserialization(config, DoubleFormatInfo.INSTANCE, 
0.33333333332, "0.33333333332");
-        testBasicDeserialization(config, DecimalFormatInfo.INSTANCE, new 
BigDecimal("1234.0000000000000000000000001"),
+        testBasicDeserialization(config, DecimalFormatInfo.INSTANCE,
+                new BigDecimal("1234.0000000000000000000000001"),
                 "1234.0000000000000000000000001");
-        testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"), 
Date.valueOf("2020-03-22"), "22/03/2020");
+        testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"),
+                Date.valueOf("2020-03-22"), "22/03/2020");
         testBasicDeserialization(config, new TimeFormatInfo("ss/mm/hh"), 
Time.valueOf("11:12:13"), "13/12/11");
         testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy 
hh:mm:ss"),
                 Timestamp.valueOf("2020-03-22 11:12:13"), "22/03/2020 
11:12:13");
@@ -101,7 +103,8 @@ public class CsvDeserializationSchemaTest {
         testBasicDeserialization(config, DecimalFormatInfo.INSTANCE, null, 
nullLiteral);
         testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"), 
null, nullLiteral);
         testBasicDeserialization(config, new TimeFormatInfo("ss/mm/hh"), null, 
nullLiteral);
-        testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy 
hh:mm:ss"), null, nullLiteral);
+        testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy 
hh:mm:ss"),
+                null, nullLiteral);
     }
 
     @Test
@@ -179,7 +182,7 @@ public class CsvDeserializationSchemaTest {
                 "1,field1,field2,field3,field4".getBytes());
     }
 
-    @Test(expected = Exception.class)
+    @Test
     public void testErrors() throws Exception {
         Consumer<CsvDeserializationSchema.Builder> config = builder -> {
         };
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
index 7a65ee7e8a..c2c0239518 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -203,7 +203,7 @@ public class InLongMsgCsvFormatDeserializerTest {
         List<Row> actualRows = new ArrayList<>();
         Collector<Row> collector = new ListCollector<>(actualRows);
         deserializer.flatMap(inLongMsg.buildArray(), collector);
-        assertEquals(1, errorHandler.getRowCount());
+        assertEquals(0, errorHandler.getRowCount());
 
         InLongMsg inLongMsg2 = InLongMsg.newInLongMsg();
         String abNormalAttrs = 
"m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2";
@@ -461,6 +461,16 @@ public class InLongMsgCsvFormatDeserializerTest {
         expectedAttributes.put("__addcol1__", "1");
         expectedAttributes.put("__addcol2__", "2");
 
+        Row expectedRow1 = Row.of(
+                Timestamp.valueOf("2020-03-22 00:00:00"),
+                expectedAttributes,
+                1,
+                2,
+                null,
+                "field11",
+                "field12",
+                "field13");
+
         Row expectedRow2 = Row.of(
                 Timestamp.valueOf("2020-03-22 00:00:00"),
                 expectedAttributes,
@@ -471,10 +481,10 @@ public class InLongMsgCsvFormatDeserializerTest {
                 "field22",
                 "field23");
 
-        testRowDeserialization(
-                deserializer,
-                inLongMsg1.buildArray(),
-                Collections.singletonList(expectedRow2));
+        List list = new ArrayList();
+        list.add(expectedRow1);
+        list.add(expectedRow2);
+        testRowDeserialization(deserializer, inLongMsg1.buildArray(), list);
     }
 
     @Test
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
index c29258ee42..abfb25a536 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
@@ -87,7 +87,7 @@ public class InLongMsgKvFormatDeserializerTest {
         List<Row> actualRows = new ArrayList<>();
         Collector<Row> collector = new ListCollector<>(actualRows);
         deserializer.flatMap(inLongMsg.buildArray(), collector);
-        assertEquals(1, errorHandler.getRowCount());
+        assertEquals(0, errorHandler.getRowCount());
 
         InLongMsg InLongMsgHead = InLongMsg.newInLongMsg();
         String abNormalAttrs = 
"m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2";
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
index 545ea9d2ae..fae60c03a4 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -86,7 +86,7 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
         List<Row> actualRows = new ArrayList<>();
         Collector<Row> collector = new ListCollector<>(actualRows);
         deserializer.flatMap(inLongMsg1.buildArray(), collector);
-        assertEquals(2, errorHandler.getRowCount());
+        assertEquals(0, errorHandler.getRowCount());
 
         InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
         String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=test";
diff --git 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
index 4ac7d3edb2..72d9604719 100644
--- 
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
@@ -89,7 +89,7 @@ public class InLongMsgTlogKvFormatDeserializerTest {
         List<Row> actualRows = new ArrayList<>();
         Collector<Row> collector = new ListCollector<>(actualRows);
         deserializer.flatMap(inLongMsg1.buildArray(), collector);
-        assertEquals(2, errorHandler.getRowCount());
+        assertEquals(0, errorHandler.getRowCount());
 
         InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
         String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=tes";
diff --git 
a/inlong-sort/sort-formats/format-row/format-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchemaTest.java
 
b/inlong-sort/sort-formats/format-row/format-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchemaTest.java
index 5474e68d2a..144c2aa889 100644
--- 
a/inlong-sort/sort-formats/format-row/format-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchemaTest.java
+++ 
b/inlong-sort/sort-formats/format-row/format-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchemaTest.java
@@ -169,7 +169,7 @@ public class KvDeserializationSchemaTest {
                 "f1=10&f2=aa&f3=bb&f4=cc".getBytes(StandardCharsets.UTF_16));
     }
 
-    @Test(expected = Exception.class)
+    @Test
     public void testErrors() throws Exception {
         Consumer<KvDeserializationSchema.Builder> config = builder -> {
         };
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
index 9857c6ece0..fcd6f468e7 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -286,7 +286,7 @@ public class InLongMsgCsvFormatDeserializerTest {
         List<RowData> actualRows = new ArrayList<>();
         Collector<RowData> collector = new ListCollector<>(actualRows);
         deserializer.flatMap(inLongMsg.buildArray(), collector);
-        assertEquals(1, errorHandler.getRowCount());
+        assertEquals(0, errorHandler.getRowCount());
 
         InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
         String abNormalAttrs = 
"m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2";
@@ -545,20 +545,33 @@ public class InLongMsgCsvFormatDeserializerTest {
         expectedAttributes.put("__addcol1__", "1");
         expectedAttributes.put("__addcol2__", "2");
 
-        GenericRowData expectRowData = new GenericRowData(8);
-        expectRowData.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
-        expectRowData.setField(1, mapConvert.convert(expectedAttributes));
-        expectRowData.setField(2, 1);
-        expectRowData.setField(3, 2);
-        expectRowData.setField(4, 123);
-        expectRowData.setField(5, StringData.fromString("field21"));
-        expectRowData.setField(6, StringData.fromString("field22"));
-        expectRowData.setField(7, StringData.fromString("field23"));
+        GenericRowData expectRowData1 = new GenericRowData(8);
+        expectRowData1.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData1.setField(2, 1);
+        expectRowData1.setField(3, 2);
+        expectRowData1.setField(4, null);
+        expectRowData1.setField(5, StringData.fromString("field11"));
+        expectRowData1.setField(6, StringData.fromString("field12"));
+        expectRowData1.setField(7, StringData.fromString("field13"));
+
+        GenericRowData expectRowData2 = new GenericRowData(8);
+        expectRowData2.setField(0, 
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+        expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+        expectRowData2.setField(2, 1);
+        expectRowData2.setField(3, 2);
+        expectRowData2.setField(4, 123);
+        expectRowData2.setField(5, StringData.fromString("field21"));
+        expectRowData2.setField(6, StringData.fromString("field22"));
+        expectRowData2.setField(7, StringData.fromString("field23"));
 
+        List expectList = new ArrayList();
+        expectList.add(expectRowData1);
+        expectList.add(expectRowData2);
         testRowDeserialization(
                 deserializer,
                 inLongMsg.buildArray(),
-                Collections.singletonList(expectRowData));
+                expectList);
     }
 
     @Test
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
index 39f3fd83d7..9025cbc439 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
@@ -96,7 +96,7 @@ public class InLongMsgKvFormatDeserializerTest {
         List<RowData> actualRows = new ArrayList<>();
         Collector<RowData> collector = new ListCollector<>(actualRows);
         deserializer.flatMap(inLongMsg.buildArray(), collector);
-        assertEquals(1, errorHandler.getRowCount());
+        assertEquals(0, errorHandler.getRowCount());
 
         InLongMsg inLongMsg1 = InLongMsg.newInLongMsg();
         String abNormalAttrs = 
"m=0&iname=testInterfaceId&__addcol1__=1&__addcol2__=2";
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
index ee78cd9ce2..664e8de31c 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -103,7 +103,7 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
         List<RowData> actualRows = new ArrayList<>();
         Collector<RowData> collector = new ListCollector<>(actualRows);
         deserializer.flatMap(inLongMsg1.buildArray(), collector);
-        assertEquals(2, errorHandler.getRowCount());
+        assertEquals(0, errorHandler.getRowCount());
 
         InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
         String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=test";
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
index d76b63ba41..e797a9774f 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
@@ -98,7 +98,7 @@ public class InLongMsgTlogKvFormatDeserializerTest {
         List<RowData> actualRowDatas = new ArrayList<>();
         Collector<RowData> collector = new ListCollector<>(actualRowDatas);
         deserializer.flatMap(inLongMsg1.buildArray(), collector);
-        assertEquals(2, errorHandler.getRowCount());
+        assertEquals(0, errorHandler.getRowCount());
 
         InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
         String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=tes";
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
index 8a05de9b92..c2d55af15d 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.sort.formats.base;
 
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.IgnoreFailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.NoOpFailureHandler;
 import org.apache.inlong.sort.formats.metrics.FormatMetricGroup;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -39,15 +42,7 @@ public abstract class DefaultDeserializationSchema<T> 
implements Deserialization
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeserializationSchema.class);
 
-    /**
-     * If true, the deserialization error will be ignored.
-     */
-    private final boolean ignoreErrors;
-
-    /**
-     * If true, a parsing error is occurred.
-     */
-    private boolean errorOccurred = false;
+    protected FailureHandler failureHandler;
 
     /**
      * The format metric group.
@@ -55,7 +50,15 @@ public abstract class DefaultDeserializationSchema<T> 
implements Deserialization
     protected transient FormatMetricGroup formatMetricGroup;
 
     public DefaultDeserializationSchema(boolean ignoreErrors) {
-        this.ignoreErrors = ignoreErrors;
+        if (ignoreErrors) {
+            failureHandler = new IgnoreFailureHandler();
+        } else {
+            failureHandler = new NoOpFailureHandler();
+        }
+    }
+
+    public DefaultDeserializationSchema(FailureHandler failureHandler) {
+        this.failureHandler = failureHandler;
     }
 
     @Override
@@ -82,14 +85,12 @@ public abstract class DefaultDeserializationSchema<T> 
implements Deserialization
         try {
             T result = deserializeInternal(bytes);
             // reset error state after deserialize success
-            errorOccurred = false;
             return result;
         } catch (Exception e) {
-            errorOccurred = true;
             if (formatMetricGroup != null) {
                 formatMetricGroup.getNumRecordsDeserializeError().inc(1L);
             }
-            if (ignoreErrors) {
+            if (failureHandler != null && failureHandler.isIgnoreFailure()) {
                 if (formatMetricGroup != null) {
                     
formatMetricGroup.getNumRecordsDeserializeErrorIgnored().inc(1L);
                 }
@@ -106,11 +107,7 @@ public abstract class DefaultDeserializationSchema<T> 
implements Deserialization
         }
     }
 
-    public boolean skipCurrentRecord(T element) {
-        return ignoreErrors && errorOccurred;
-    }
-
-    protected abstract T deserializeInternal(byte[] bytes) throws IOException;
+    protected abstract T deserializeInternal(byte[] bytes) throws Exception;
 
     @Override
     public boolean equals(Object object) {
@@ -121,11 +118,11 @@ public abstract class DefaultDeserializationSchema<T> 
implements Deserialization
             return false;
         }
         DefaultDeserializationSchema<?> that = 
(DefaultDeserializationSchema<?>) object;
-        return Objects.equals(ignoreErrors, that.ignoreErrors);
+        return Objects.equals(failureHandler, that.failureHandler);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(ignoreErrors);
+        return Objects.hash(failureHandler);
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
index 71c19a8cc8..39126370a7 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
@@ -40,7 +40,6 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.getDataType;
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.serializeBasicField;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link TableFormatForRowDataUtils}.
@@ -188,23 +187,24 @@ public class TableFormatUtilsTest {
         assertEquals(1, result1);
 
         try {
-            deserializeBasicField(
+            Object result2 = deserializeBasicField(
                     "f",
                     IntFormatInfo.INSTANCE,
                     "",
                     "n/a");
-            fail("The method is expected to throw an exception.");
+
+            assertNull(result2);
         } catch (Exception e) {
             // ignored
         }
 
-        Object result2 =
+        Object result3 =
                 deserializeBasicField(
                         "f",
                         IntFormatInfo.INSTANCE,
                         "n/a",
                         "n/a");
-        assertNull(result2);
+        assertNull(result3);
     }
 
     @Test
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
index ee36e98126..4541d0d555 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
@@ -210,7 +210,7 @@ public final class CsvRowDataDeserializationSchema extends 
DefaultDeserializatio
     }
 
     @Override
-    public RowData deserializeInternal(@Nullable byte[] message) {
+    public RowData deserializeInternal(@Nullable byte[] message) throws 
Exception {
         if (message == null) {
             return null;
         }
@@ -241,12 +241,12 @@ public final class CsvRowDataDeserializationSchema 
extends DefaultDeserializatio
                     rowData.setField(i, converters[i].convert(field));
                 }
             }
-
             return rowData;
         } catch (Throwable t) {
-            throw new RuntimeException(
-                    String.format("Could not properly deserialize csv. 
Text=[%s].", text), t);
+            failureHandler.onParsingMsgFailure(text, new RuntimeException(
+                    String.format("Could not properly deserialize csv. 
Text=[{}].", text), t));
         }
+        return null;
     }
 
     @Override
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java
index 6d0c848e57..d804bc6fa9 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java
@@ -275,7 +275,7 @@ public class CsvRowDataDeserializationSchemaTest extends 
TestLogger {
                 false);
     }
 
-    @Test(expected = Exception.class)
+    @Test
     public void testErrors() throws Exception {
         Consumer<CsvRowDataDeserializationSchema.Builder> config = builder -> {
         };
@@ -295,9 +295,14 @@ public class CsvRowDataDeserializationSchemaTest extends 
TestLogger {
     public void testIgnoreErrors() throws Exception {
         Consumer<CsvRowDataDeserializationSchema.Builder> config = builder -> {
         };
+        GenericRowData rowData = new GenericRowData(4);
+        rowData.setField(0, null);
+        rowData.setField(1, StringData.fromString("field1"));
+        rowData.setField(2, StringData.fromString("field2"));
+        rowData.setField(3, StringData.fromString("field3"));
         testRowDataDeserialization(
                 config,
-                null,
+                rowData,
                 "na,field1,field2,field3".getBytes(),
                 true);
     }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
index 88394f6560..386f75f998 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -104,13 +103,20 @@ public class JsonRowDataDeserializationSchema extends 
DefaultDeserializationSche
     }
 
     @Override
-    public RowData deserializeInternal(@Nullable byte[] message) throws 
IOException {
+    public RowData deserializeInternal(@Nullable byte[] message) throws 
Exception {
         if (message == null) {
             return null;
         }
 
         String jsonStr = new String(message, charset);
-        return (RowData) runtimeConverter.convert(jsonStr);
+        RowData rowData = null;
+        try {
+            rowData = (RowData) runtimeConverter.convert(jsonStr);
+        } catch (Throwable t) {
+            failureHandler.onParsingMsgFailure(jsonStr, new RuntimeException(
+                    String.format("Could not properly deserialize json. 
Text=[%s].", jsonStr), t));
+        }
+        return rowData;
     }
 
     @Override
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/FieldToRowDataConvertersTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/FieldToRowDataConvertersTest.java
index 01fdde20f8..c0b2b08c72 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/FieldToRowDataConvertersTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/FieldToRowDataConvertersTest.java
@@ -36,6 +36,7 @@ import java.sql.Timestamp;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TimeZone;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.flink.table.api.DataTypes.ARRAY;
@@ -126,6 +127,7 @@ public class FieldToRowDataConvertersTest {
 
     @Test
     public void testConvertToTimestampWithLocalZone() throws IOException {
+        TimeZone.setDefault(TimeZone.getDefault().getTimeZone("GMT+0"));
         FieldToRowDataConverter converter =
                 
converters.createConverter(TIMESTAMP_WITH_LOCAL_TIME_ZONE().getLogicalType());
         TimestampData expected = TimestampData.fromTimestamp(new Timestamp(0));
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchemaTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchemaTest.java
index 9d0bf64cea..2e9e62a889 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchemaTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchemaTest.java
@@ -35,7 +35,6 @@ import java.util.Objects;
 import static org.apache.inlong.sort.formats.base.TextFormatOptions.CHARSET;
 import static 
org.apache.inlong.sort.formats.base.TextFormatOptionsUtil.ISO_8601;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -65,7 +64,6 @@ public class JsonRowDataDeserializationSchemaTest extends 
JsonRowDataSerDeTestBa
 
         assertEquals(testRowData, rowData);
         assertNotNull(rowData);
-        assertFalse(deserializationSchema.skipCurrentRecord(rowData));
     }
 
     // @Test
@@ -128,10 +126,8 @@ public class JsonRowDataDeserializationSchemaTest extends 
JsonRowDataSerDeTestBa
                         .build();
         RowData rowData = deserializationSchema.deserialize(data);
         assertNull(rowData);
-        assertTrue(deserializationSchema.skipCurrentRecord(rowData));
 
         rowData = 
deserializationSchema.deserialize((testJson.getBytes(CHARSET.defaultValue())));
         assertNotNull(rowData);
-        assertFalse(deserializationSchema.skipCurrentRecord(rowData));
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataSerDeTestBase.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataSerDeTestBase.java
index 24b2875e40..dee4961f2e 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataSerDeTestBase.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataSerDeTestBase.java
@@ -35,6 +35,7 @@ import java.sql.Timestamp;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TimeZone;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.flink.table.api.DataTypes.ARRAY;
@@ -68,6 +69,7 @@ public abstract class JsonRowDataSerDeTestBase {
 
     @Before
     public void init() {
+        TimeZone.setDefault(TimeZone.getDefault().getTimeZone("GMT+0"));
         byte[] bytes = new byte[10];
         ThreadLocalRandom.current().nextBytes(bytes);
         String base64Str = Base64.getEncoder().encodeToString(bytes);
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/RowDataToFieldConvertersTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/RowDataToFieldConvertersTest.java
index ad4ce0fc45..63bfd28ed9 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/RowDataToFieldConvertersTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/RowDataToFieldConvertersTest.java
@@ -35,6 +35,7 @@ import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.TimeZone;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.flink.table.api.DataTypes.ARRAY;
@@ -125,12 +126,11 @@ public class RowDataToFieldConvertersTest {
 
     @Test
     public void testConvertTimestampWithLocalZone() {
+        TimeZone.setDefault(TimeZone.getDefault().getTimeZone("GMT+0"));
         RowDataToFieldConverter converter =
                 
converters.createConverter(TIMESTAMP_WITH_LOCAL_TIME_ZONE().getLogicalType());
         TimestampData testTimestampData = TimestampData.fromTimestamp(new 
Timestamp(0));
-        // assertEquals("1970-01-01 00:00:00Z", 
converter.convert(testTimestampData));
-        assertTrue("1970-01-01 
00:00:00Z".equals(converter.convert(testTimestampData))
-                || "1970-01-01 
08:00:00Z".equals(converter.convert(testTimestampData)));
+        assertEquals("1970-01-01 00:00:00Z", 
converter.convert(testTimestampData));
     }
 
     @Test
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
index d0bb4f1735..033a51135e 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
@@ -145,8 +145,9 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
     }
 
     @Override
-    public RowData deserializeInternal(byte[] bytes) throws IOException {
+    public RowData deserializeInternal(byte[] bytes) throws Exception {
         String text = new String(bytes, Charset.forName(charset));
+        GenericRowData rowData = null;
         try {
             Map<String, String> fieldTexts =
                     splitKv(text, entryDelimiter, kvDelimiter, escapeChar, 
quoteChar);
@@ -154,7 +155,7 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
             String[] fieldNames = rowFormatInfo.getFieldNames();
             FormatInfo[] fieldFormatInfos = 
rowFormatInfo.getFieldFormatInfos();
 
-            GenericRowData rowData = new 
GenericRowData(fieldFormatInfos.length);
+            rowData = new GenericRowData(fieldFormatInfos.length);
             for (int i = 0; i < fieldFormatInfos.length; i++) {
                 String fieldName = fieldNames[i];
                 FormatInfo fieldFormatInfo = fieldFormatInfos[i];
@@ -170,9 +171,10 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
             }
             return rowData;
         } catch (Throwable t) {
-            throw new IOException(
-                    String.format("Could not properly deserialize kv. 
Text=[%s].", text), t);
+            failureHandler.onParsingMsgFailure(text, new RuntimeException(
+                    String.format("Could not properly deserialize kv. 
Text=[{}].", text), t));
         }
+        return null;
     }
 
     @Override
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java
index a0350905e3..bcc0dc3911 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java
@@ -231,7 +231,7 @@ public class KvRowDataDeserializationSchemaTest {
                 "f1=10&f2=aa&f3=bb&f4=cc".getBytes(StandardCharsets.UTF_16));
     }
 
-    @Test(expected = Exception.class)
+    @Test
     public void testErrors() throws Exception {
         Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> {
         };


Reply via email to