This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5a58040909 [INLONG-9791][Sort] Return null instead of throwing an exception when deserialization by type fails (#9792) 5a58040909 is described below commit 5a58040909423aebd25a2bf40df80a23480bf047 Author: baomingyu <baomingy...@163.com> AuthorDate: Tue Mar 12 11:22:30 2024 +0800 [INLONG-9791][Sort] Return null instead of throwing an exception when deserialization by type fails (#9792) --- .../inlong/sort/formats/base/TableFormatUtils.java | 9 ++++-- .../sort/formats/inlongmsg/FailureHandler.java | 20 ++++++++++++ .../formats/inlongmsg/IgnoreFailureHandler.java | 10 ++++++ .../sort/formats/inlongmsg/NoOpFailureHandler.java | 11 +++++++ .../sort/formats/base/TableFormatUtilsTest.java | 10 +++--- .../formats/csv/CsvDeserializationSchemaTest.java | 11 ++++--- .../InLongMsgCsvFormatDeserializerTest.java | 20 +++++++++--- .../InLongMsgKvFormatDeserializerTest.java | 2 +- .../InLongMsgTlogCsvFormatDeserializerTest.java | 2 +- .../InLongMsgTlogKvFormatDeserializerTest.java | 2 +- .../formats/kv/KvDeserializationSchemaTest.java | 2 +- .../InLongMsgCsvFormatDeserializerTest.java | 35 +++++++++++++------- .../InLongMsgKvFormatDeserializerTest.java | 2 +- .../InLongMsgTlogCsvFormatDeserializerTest.java | 2 +- .../InLongMsgTlogKvFormatDeserializerTest.java | 2 +- .../formats/base/DefaultDeserializationSchema.java | 37 ++++++++++------------ .../sort/formats/base/TableFormatUtilsTest.java | 10 +++--- .../csv/CsvRowDataDeserializationSchema.java | 8 ++--- .../csv/CsvRowDataDeserializationSchemaTest.java | 9 ++++-- .../json/JsonRowDataDeserializationSchema.java | 12 +++++-- .../formats/json/FieldToRowDataConvertersTest.java | 2 ++ .../json/JsonRowDataDeserializationSchemaTest.java | 4 --- .../formats/json/JsonRowDataSerDeTestBase.java | 2 ++ .../formats/json/RowDataToFieldConvertersTest.java | 6 ++-- .../formats/kv/KvRowDataDeserializationSchema.java | 10 +++--- .../kv/KvRowDataDeserializationSchemaTest.java | 2 +- 26 files changed, 162 insertions(+), 80 deletions(-) diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java index d83108de39..430b89b32f 100644 --- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java +++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java @@ -92,6 +92,8 @@ import org.apache.flink.table.types.logical.TinyIntType; import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.lang.reflect.Array; import java.math.BigDecimal; @@ -111,6 +113,8 @@ import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SC */ public class TableFormatUtils { + private static final Logger LOG = LoggerFactory.getLogger(TableFormatUtils.class); + /** * Returns the {@link DeserializationSchema} described by the given * properties. @@ -567,9 +571,10 @@ public class TableFormatUtils { try { return ((BasicFormatInfo<?>) fieldFormatInfo).deserialize(fieldText); } catch (Exception e) { - throw new RuntimeException("Could not properly deserialize the " - + "text " + fieldText + " for field " + fieldName + ".", e); + LOG.warn("Could not properly deserialize the " + "text " + + fieldText + " for field " + fieldName + ".", e); } + return null; } /** diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java index 7dff44fa10..6bdbf36103 100644 --- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java +++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java @@ -24,6 +24,25 @@ import java.io.Serializable; */ public interface FailureHandler extends Serializable { + /** + * This method is called when there is a failure occurred while parsing to check is + * or not parse failure. + * + */ + default boolean isIgnoreFailure() { + return false; + }; + + /** + * This method is called when there is a failure occurred while parsing non-InLong message. + * + * @param msg the msg byte + * @param exception the thrown exception + * @throws Exception the exception + */ + default void onParsingMsgFailure(Object msg, Exception exception) throws Exception { + }; + /** * This method is called when there is a failure occurred while parsing InLongMsg head. * @@ -51,4 +70,5 @@ public interface FailureHandler extends Serializable { * @throws Exception the exception */ void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body, Exception exception) throws Exception; + } diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java index 6ac0b956f2..b27fefc891 100644 --- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java +++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java @@ -27,6 +27,11 @@ public class IgnoreFailureHandler implements FailureHandler { private static final Logger LOG = LoggerFactory.getLogger(IgnoreFailureHandler.class); + @Override + public void onParsingMsgFailure(Object msg, Exception exception) { + LOG.error("Could not properly deserialize msg=[{}].", msg, exception); + }; + @Override public void onParsingHeadFailure(String attribute, Exception exception) { LOG.warn("Cannot properly parse the head {}", attribute, exception); @@ -42,6 +47,11 @@ public class IgnoreFailureHandler implements FailureHandler { LOG.warn("Cannot properly convert the InLongMsg ({}, {})", head, body, exception); } + @Override + public boolean isIgnoreFailure() { + return true; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java index 66eeb19e69..d049920be1 100644 --- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java +++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java @@ -27,6 +27,12 @@ public class NoOpFailureHandler implements FailureHandler { private static final Logger LOG = LoggerFactory.getLogger(NoOpFailureHandler.class); + @Override + public void onParsingMsgFailure(Object msg, Exception t) throws Exception { + LOG.error("Could not properly serialize msg=[{}].", msg, t); + throw t; + } + @Override public void onParsingHeadFailure(String attribute, Exception exception) throws Exception { LOG.error("Cannot properly parse the head {}", attribute, exception); @@ -45,6 +51,11 @@ public class NoOpFailureHandler implements FailureHandler { throw exception; } + @Override + public boolean isIgnoreFailure() { + return false; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java b/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java index 9aada97b24..189fda1e0e 100644 --- a/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java +++ b/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java @@ -20,13 +20,13 @@ package org.apache.inlong.sort.formats.base; import org.apache.inlong.sort.formats.common.IntFormatInfo; import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.junit.Assert; import org.junit.Test; import static org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField; import static org.apache.inlong.sort.formats.base.TableFormatUtils.serializeBasicField; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; /** * Tests for {@link TableFormatUtils}. @@ -174,23 +174,23 @@ public class TableFormatUtilsTest { assertEquals(1, result1); try { - deserializeBasicField( + Object result2 = deserializeBasicField( "f", IntFormatInfo.INSTANCE, "", "n/a"); - fail("The method is expected to throw an exception."); + Assert.assertEquals(null, result2); } catch (Exception e) { // ignored } - Object result2 = + Object result3 = deserializeBasicField( "f", IntFormatInfo.INSTANCE, "n/a", "n/a"); - assertNull(result2); + assertNull(result3); } @Test diff --git a/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchemaTest.java b/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchemaTest.java index c7cf93fe9c..9ddc0c4a74 100644 --- a/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchemaTest.java +++ b/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchemaTest.java @@ -75,9 +75,11 @@ public class CsvDeserializationSchemaTest { testBasicDeserialization(config, LongFormatInfo.INSTANCE, 12345678910L, "12345678910"); testBasicDeserialization(config, FloatFormatInfo.INSTANCE, 0.33333334f, "0.33333334"); testBasicDeserialization(config, DoubleFormatInfo.INSTANCE, 0.33333333332, "0.33333333332"); - testBasicDeserialization(config, DecimalFormatInfo.INSTANCE, new BigDecimal("1234.0000000000000000000000001"), + testBasicDeserialization(config, DecimalFormatInfo.INSTANCE, + new BigDecimal("1234.0000000000000000000000001"), "1234.0000000000000000000000001"); - testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"), Date.valueOf("2020-03-22"), "22/03/2020"); + testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"), + Date.valueOf("2020-03-22"), "22/03/2020"); testBasicDeserialization(config, new TimeFormatInfo("ss/mm/hh"), Time.valueOf("11:12:13"), "13/12/11"); testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy hh:mm:ss"), Timestamp.valueOf("2020-03-22 11:12:13"), "22/03/2020 11:12:13"); @@ -101,7 +103,8 @@ public class CsvDeserializationSchemaTest { testBasicDeserialization(config, DecimalFormatInfo.INSTANCE, null, nullLiteral); testBasicDeserialization(config, new DateFormatInfo("dd/MM/yyyy"), null, nullLiteral); testBasicDeserialization(config, new TimeFormatInfo("ss/mm/hh"), null, nullLiteral); - testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy hh:mm:ss"), null, nullLiteral); + testBasicDeserialization(config, new TimestampFormatInfo("dd/MM/yyyy hh:mm:ss"), + null, nullLiteral); } @Test @@ -179,7 +182,7 @@ public class CsvDeserializationSchemaTest { "1,field1,field2,field3,field4".getBytes()); } - @Test(expected = Exception.class) + @Test public void testErrors() throws Exception { Consumer<CsvDeserializationSchema.Builder> config = builder -> { }; diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java index 7a65ee7e8a..c2c0239518 100644 --- a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java @@ -203,7 +203,7 @@ public class InLongMsgCsvFormatDeserializerTest { List<Row> actualRows = new ArrayList<>(); Collector<Row> collector = new ListCollector<>(actualRows); deserializer.flatMap(inLongMsg.buildArray(), collector); - assertEquals(1, errorHandler.getRowCount()); + assertEquals(0, errorHandler.getRowCount()); InLongMsg inLongMsg2 = InLongMsg.newInLongMsg(); String abNormalAttrs = "m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2"; @@ -461,6 +461,16 @@ public class InLongMsgCsvFormatDeserializerTest { expectedAttributes.put("__addcol1__", "1"); expectedAttributes.put("__addcol2__", "2"); + Row expectedRow1 = Row.of( + Timestamp.valueOf("2020-03-22 00:00:00"), + expectedAttributes, + 1, + 2, + null, + "field11", + "field12", + "field13"); + Row expectedRow2 = Row.of( Timestamp.valueOf("2020-03-22 00:00:00"), expectedAttributes, @@ -471,10 +481,10 @@ public class InLongMsgCsvFormatDeserializerTest { "field22", "field23"); - testRowDeserialization( - deserializer, - inLongMsg1.buildArray(), - Collections.singletonList(expectedRow2)); + List list = new ArrayList(); + list.add(expectedRow1); + list.add(expectedRow2); + testRowDeserialization(deserializer, inLongMsg1.buildArray(), list); } @Test diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java index c29258ee42..abfb25a536 100644 --- a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java @@ -87,7 +87,7 @@ public class InLongMsgKvFormatDeserializerTest { List<Row> actualRows = new ArrayList<>(); Collector<Row> collector = new ListCollector<>(actualRows); deserializer.flatMap(inLongMsg.buildArray(), collector); - assertEquals(1, errorHandler.getRowCount()); + assertEquals(0, errorHandler.getRowCount()); InLongMsg InLongMsgHead = InLongMsg.newInLongMsg(); String abNormalAttrs = "m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2"; diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java index 545ea9d2ae..fae60c03a4 100644 --- a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java @@ -86,7 +86,7 @@ public class InLongMsgTlogCsvFormatDeserializerTest { List<Row> actualRows = new ArrayList<>(); Collector<Row> collector = new ListCollector<>(actualRows); deserializer.flatMap(inLongMsg1.buildArray(), collector); - assertEquals(2, errorHandler.getRowCount()); + assertEquals(0, errorHandler.getRowCount()); InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg(); String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=test"; diff --git a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java index 4ac7d3edb2..72d9604719 100644 --- a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java @@ -89,7 +89,7 @@ public class InLongMsgTlogKvFormatDeserializerTest { List<Row> actualRows = new ArrayList<>(); Collector<Row> collector = new ListCollector<>(actualRows); deserializer.flatMap(inLongMsg1.buildArray(), collector); - assertEquals(2, errorHandler.getRowCount()); + assertEquals(0, errorHandler.getRowCount()); InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg(); String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=tes"; diff --git a/inlong-sort/sort-formats/format-row/format-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchemaTest.java b/inlong-sort/sort-formats/format-row/format-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchemaTest.java index 5474e68d2a..144c2aa889 100644 --- a/inlong-sort/sort-formats/format-row/format-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchemaTest.java +++ b/inlong-sort/sort-formats/format-row/format-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchemaTest.java @@ -169,7 +169,7 @@ public class KvDeserializationSchemaTest { "f1=10&f2=aa&f3=bb&f4=cc".getBytes(StandardCharsets.UTF_16)); } - @Test(expected = Exception.class) + @Test public void testErrors() throws Exception { Consumer<KvDeserializationSchema.Builder> config = builder -> { }; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java index 9857c6ece0..fcd6f468e7 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java @@ -286,7 +286,7 @@ public class InLongMsgCsvFormatDeserializerTest { List<RowData> actualRows = new ArrayList<>(); Collector<RowData> collector = new ListCollector<>(actualRows); deserializer.flatMap(inLongMsg.buildArray(), collector); - assertEquals(1, errorHandler.getRowCount()); + assertEquals(0, errorHandler.getRowCount()); InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg(); String abNormalAttrs = "m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2"; @@ -545,20 +545,33 @@ public class InLongMsgCsvFormatDeserializerTest { expectedAttributes.put("__addcol1__", "1"); expectedAttributes.put("__addcol2__", "2"); - GenericRowData expectRowData = new GenericRowData(8); - expectRowData.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00"))); - expectRowData.setField(1, mapConvert.convert(expectedAttributes)); - expectRowData.setField(2, 1); - expectRowData.setField(3, 2); - expectRowData.setField(4, 123); - expectRowData.setField(5, StringData.fromString("field21")); - expectRowData.setField(6, StringData.fromString("field22")); - expectRowData.setField(7, StringData.fromString("field23")); + GenericRowData expectRowData1 = new GenericRowData(8); + expectRowData1.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00"))); + expectRowData1.setField(1, mapConvert.convert(expectedAttributes)); + expectRowData1.setField(2, 1); + expectRowData1.setField(3, 2); + expectRowData1.setField(4, null); + expectRowData1.setField(5, StringData.fromString("field11")); + expectRowData1.setField(6, StringData.fromString("field12")); + expectRowData1.setField(7, StringData.fromString("field13")); + + GenericRowData expectRowData2 = new GenericRowData(8); + expectRowData2.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00"))); + expectRowData2.setField(1, mapConvert.convert(expectedAttributes)); + expectRowData2.setField(2, 1); + expectRowData2.setField(3, 2); + expectRowData2.setField(4, 123); + expectRowData2.setField(5, StringData.fromString("field21")); + expectRowData2.setField(6, StringData.fromString("field22")); + expectRowData2.setField(7, StringData.fromString("field23")); + List expectList = new ArrayList(); + expectList.add(expectRowData1); + expectList.add(expectRowData2); testRowDeserialization( deserializer, inLongMsg.buildArray(), - Collections.singletonList(expectRowData)); + expectList); } @Test diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java index 39f3fd83d7..9025cbc439 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java @@ -96,7 +96,7 @@ public class InLongMsgKvFormatDeserializerTest { List<RowData> actualRows = new ArrayList<>(); Collector<RowData> collector = new ListCollector<>(actualRows); deserializer.flatMap(inLongMsg.buildArray(), collector); - assertEquals(1, errorHandler.getRowCount()); + assertEquals(0, errorHandler.getRowCount()); InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(); String abNormalAttrs = "m=0&iname=testInterfaceId&__addcol1__=1&__addcol2__=2"; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java index ee78cd9ce2..664e8de31c 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java @@ -103,7 +103,7 @@ public class InLongMsgTlogCsvFormatDeserializerTest { List<RowData> actualRows = new ArrayList<>(); Collector<RowData> collector = new ListCollector<>(actualRows); deserializer.flatMap(inLongMsg1.buildArray(), collector); - assertEquals(2, errorHandler.getRowCount()); + assertEquals(0, errorHandler.getRowCount()); InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg(); String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=test"; diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java index d76b63ba41..e797a9774f 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java @@ -98,7 +98,7 @@ public class InLongMsgTlogKvFormatDeserializerTest { List<RowData> actualRowDatas = new ArrayList<>(); Collector<RowData> collector = new ListCollector<>(actualRowDatas); deserializer.flatMap(inLongMsg1.buildArray(), collector); - assertEquals(2, errorHandler.getRowCount()); + assertEquals(0, errorHandler.getRowCount()); InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg(); String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=tes"; diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java index 8a05de9b92..c2d55af15d 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java @@ -17,6 +17,9 @@ package org.apache.inlong.sort.formats.base; +import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; +import org.apache.inlong.sort.formats.inlongmsg.IgnoreFailureHandler; +import org.apache.inlong.sort.formats.inlongmsg.NoOpFailureHandler; import org.apache.inlong.sort.formats.metrics.FormatMetricGroup; import org.apache.flink.api.common.serialization.DeserializationSchema; @@ -39,15 +42,7 @@ public abstract class DefaultDeserializationSchema<T> implements Deserialization private static final Logger LOG = LoggerFactory.getLogger(DefaultDeserializationSchema.class); - /** - * If true, the deserialization error will be ignored. - */ - private final boolean ignoreErrors; - - /** - * If true, a parsing error is occurred. - */ - private boolean errorOccurred = false; + protected FailureHandler failureHandler; /** * The format metric group. @@ -55,7 +50,15 @@ public abstract class DefaultDeserializationSchema<T> implements Deserialization protected transient FormatMetricGroup formatMetricGroup; public DefaultDeserializationSchema(boolean ignoreErrors) { - this.ignoreErrors = ignoreErrors; + if (ignoreErrors) { + failureHandler = new IgnoreFailureHandler(); + } else { + failureHandler = new NoOpFailureHandler(); + } + } + + public DefaultDeserializationSchema(FailureHandler failureHandler) { + this.failureHandler = failureHandler; } @Override @@ -82,14 +85,12 @@ public abstract class DefaultDeserializationSchema<T> implements Deserialization try { T result = deserializeInternal(bytes); // reset error state after deserialize success - errorOccurred = false; return result; } catch (Exception e) { - errorOccurred = true; if (formatMetricGroup != null) { formatMetricGroup.getNumRecordsDeserializeError().inc(1L); } - if (ignoreErrors) { + if (failureHandler != null && failureHandler.isIgnoreFailure()) { if (formatMetricGroup != null) { formatMetricGroup.getNumRecordsDeserializeErrorIgnored().inc(1L); } @@ -106,11 +107,7 @@ public abstract class DefaultDeserializationSchema<T> implements Deserialization } } - public boolean skipCurrentRecord(T element) { - return ignoreErrors && errorOccurred; - } - - protected abstract T deserializeInternal(byte[] bytes) throws IOException; + protected abstract T deserializeInternal(byte[] bytes) throws Exception; @Override public boolean equals(Object object) { @@ -121,11 +118,11 @@ public abstract class DefaultDeserializationSchema<T> implements Deserialization return false; } DefaultDeserializationSchema<?> that = (DefaultDeserializationSchema<?>) object; - return Objects.equals(ignoreErrors, that.ignoreErrors); + return Objects.equals(failureHandler, that.failureHandler); } @Override public int hashCode() { - return Objects.hash(ignoreErrors); + return Objects.hash(failureHandler); } } diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java index 71c19a8cc8..39126370a7 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java @@ -40,7 +40,6 @@ import static org.apache.inlong.sort.formats.base.TableFormatUtils.getDataType; import static org.apache.inlong.sort.formats.base.TableFormatUtils.serializeBasicField; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; /** * Tests for {@link TableFormatForRowDataUtils}. @@ -188,23 +187,24 @@ public class TableFormatUtilsTest { assertEquals(1, result1); try { - deserializeBasicField( + Object result2 = deserializeBasicField( "f", IntFormatInfo.INSTANCE, "", "n/a"); - fail("The method is expected to throw an exception."); + + assertNull(result2); } catch (Exception e) { // ignored } - Object result2 = + Object result3 = deserializeBasicField( "f", IntFormatInfo.INSTANCE, "n/a", "n/a"); - assertNull(result2); + assertNull(result3); } @Test diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java index ee36e98126..4541d0d555 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java @@ -210,7 +210,7 @@ public final class CsvRowDataDeserializationSchema extends DefaultDeserializatio } @Override - public RowData deserializeInternal(@Nullable byte[] message) { + public RowData deserializeInternal(@Nullable byte[] message) throws Exception { if (message == null) { return null; } @@ -241,12 +241,12 @@ public final class CsvRowDataDeserializationSchema extends DefaultDeserializatio rowData.setField(i, converters[i].convert(field)); } } - return rowData; } catch (Throwable t) { - throw new RuntimeException( - String.format("Could not properly deserialize csv. Text=[%s].", text), t); + failureHandler.onParsingMsgFailure(text, new RuntimeException( + String.format("Could not properly deserialize csv. Text=[{}].", text), t)); } + return null; } @Override diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java index 6d0c848e57..d804bc6fa9 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchemaTest.java @@ -275,7 +275,7 @@ public class CsvRowDataDeserializationSchemaTest extends TestLogger { false); } - @Test(expected = Exception.class) + @Test public void testErrors() throws Exception { Consumer<CsvRowDataDeserializationSchema.Builder> config = builder -> { }; @@ -295,9 +295,14 @@ public class CsvRowDataDeserializationSchemaTest extends TestLogger { public void testIgnoreErrors() throws Exception { Consumer<CsvRowDataDeserializationSchema.Builder> config = builder -> { }; + GenericRowData rowData = new GenericRowData(4); + rowData.setField(0, null); + rowData.setField(1, StringData.fromString("field1")); + rowData.setField(2, StringData.fromString("field2")); + rowData.setField(3, StringData.fromString("field3")); testRowDataDeserialization( config, - null, + rowData, "na,field1,field2,field3".getBytes(), true); } diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java index 88394f6560..386f75f998 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java @@ -30,7 +30,6 @@ import org.apache.flink.table.types.logical.RowType; import javax.annotation.Nullable; -import java.io.IOException; import java.util.Objects; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -104,13 +103,20 @@ public class JsonRowDataDeserializationSchema extends DefaultDeserializationSche } @Override - public RowData deserializeInternal(@Nullable byte[] message) throws IOException { + public RowData deserializeInternal(@Nullable byte[] message) throws Exception { if (message == null) { return null; } String jsonStr = new String(message, charset); - return (RowData) runtimeConverter.convert(jsonStr); + RowData rowData = null; + try { + rowData = (RowData) runtimeConverter.convert(jsonStr); + } catch (Throwable t) { + failureHandler.onParsingMsgFailure(jsonStr, new RuntimeException( + String.format("Could not properly deserialize json. Text=[%s].", jsonStr), t)); + } + return rowData; } @Override diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/FieldToRowDataConvertersTest.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/FieldToRowDataConvertersTest.java index 01fdde20f8..c0b2b08c72 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/FieldToRowDataConvertersTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/FieldToRowDataConvertersTest.java @@ -36,6 +36,7 @@ import java.sql.Timestamp; import java.util.Base64; import java.util.HashMap; import java.util.Map; +import java.util.TimeZone; import java.util.concurrent.ThreadLocalRandom; import static org.apache.flink.table.api.DataTypes.ARRAY; @@ -126,6 +127,7 @@ public class FieldToRowDataConvertersTest { @Test public void testConvertToTimestampWithLocalZone() throws IOException { + TimeZone.setDefault(TimeZone.getDefault().getTimeZone("GMT+0")); FieldToRowDataConverter converter = converters.createConverter(TIMESTAMP_WITH_LOCAL_TIME_ZONE().getLogicalType()); TimestampData expected = TimestampData.fromTimestamp(new Timestamp(0)); diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchemaTest.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchemaTest.java index 9d0bf64cea..2e9e62a889 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchemaTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchemaTest.java @@ -35,7 +35,6 @@ import java.util.Objects; import static org.apache.inlong.sort.formats.base.TextFormatOptions.CHARSET; import static org.apache.inlong.sort.formats.base.TextFormatOptionsUtil.ISO_8601; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -65,7 +64,6 @@ public class JsonRowDataDeserializationSchemaTest extends JsonRowDataSerDeTestBa assertEquals(testRowData, rowData); assertNotNull(rowData); - assertFalse(deserializationSchema.skipCurrentRecord(rowData)); } // @Test @@ -128,10 +126,8 @@ public class JsonRowDataDeserializationSchemaTest extends JsonRowDataSerDeTestBa .build(); RowData rowData = deserializationSchema.deserialize(data); assertNull(rowData); - assertTrue(deserializationSchema.skipCurrentRecord(rowData)); rowData = deserializationSchema.deserialize((testJson.getBytes(CHARSET.defaultValue()))); assertNotNull(rowData); - assertFalse(deserializationSchema.skipCurrentRecord(rowData)); } } diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataSerDeTestBase.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataSerDeTestBase.java index 24b2875e40..dee4961f2e 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataSerDeTestBase.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/JsonRowDataSerDeTestBase.java @@ -35,6 +35,7 @@ import java.sql.Timestamp; import java.util.Base64; import java.util.HashMap; import java.util.Map; +import java.util.TimeZone; import java.util.concurrent.ThreadLocalRandom; import static org.apache.flink.table.api.DataTypes.ARRAY; @@ -68,6 +69,7 @@ public abstract class JsonRowDataSerDeTestBase { @Before public void init() { + TimeZone.setDefault(TimeZone.getDefault().getTimeZone("GMT+0")); byte[] bytes = new byte[10]; ThreadLocalRandom.current().nextBytes(bytes); String base64Str = Base64.getEncoder().encodeToString(bytes); diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/RowDataToFieldConvertersTest.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/RowDataToFieldConvertersTest.java index ad4ce0fc45..63bfd28ed9 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/RowDataToFieldConvertersTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/test/java/org/apache/inlong/sort/formats/json/RowDataToFieldConvertersTest.java @@ -35,6 +35,7 @@ import java.math.BigDecimal; import java.sql.Timestamp; import java.util.HashMap; import java.util.Map; +import java.util.TimeZone; import java.util.concurrent.ThreadLocalRandom; import static org.apache.flink.table.api.DataTypes.ARRAY; @@ -125,12 +126,11 @@ public class RowDataToFieldConvertersTest { @Test public void testConvertTimestampWithLocalZone() { + TimeZone.setDefault(TimeZone.getDefault().getTimeZone("GMT+0")); RowDataToFieldConverter converter = converters.createConverter(TIMESTAMP_WITH_LOCAL_TIME_ZONE().getLogicalType()); TimestampData testTimestampData = TimestampData.fromTimestamp(new Timestamp(0)); - // assertEquals("1970-01-01 00:00:00Z", converter.convert(testTimestampData)); - assertTrue("1970-01-01 00:00:00Z".equals(converter.convert(testTimestampData)) - || "1970-01-01 08:00:00Z".equals(converter.convert(testTimestampData))); + assertEquals("1970-01-01 00:00:00Z", converter.convert(testTimestampData)); } @Test diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java index d0bb4f1735..033a51135e 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java @@ -145,8 +145,9 @@ public class KvRowDataDeserializationSchema extends DefaultDeserializationSchema } @Override - public RowData deserializeInternal(byte[] bytes) throws IOException { + public RowData deserializeInternal(byte[] bytes) throws Exception { String text = new String(bytes, Charset.forName(charset)); + GenericRowData rowData = null; try { Map<String, String> fieldTexts = splitKv(text, entryDelimiter, kvDelimiter, escapeChar, quoteChar); @@ -154,7 +155,7 @@ public class KvRowDataDeserializationSchema extends DefaultDeserializationSchema String[] fieldNames = rowFormatInfo.getFieldNames(); FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos(); - GenericRowData rowData = new GenericRowData(fieldFormatInfos.length); + rowData = new GenericRowData(fieldFormatInfos.length); for (int i = 0; i < fieldFormatInfos.length; i++) { String fieldName = fieldNames[i]; FormatInfo fieldFormatInfo = fieldFormatInfos[i]; @@ -170,9 +171,10 @@ public class KvRowDataDeserializationSchema extends DefaultDeserializationSchema } return rowData; } catch (Throwable t) { - throw new IOException( - String.format("Could not properly deserialize kv. Text=[%s].", text), t); + failureHandler.onParsingMsgFailure(text, new RuntimeException( + String.format("Could not properly deserialize kv. Text=[{}].", text), t)); } + return null; } @Override diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java index a0350905e3..bcc0dc3911 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java +++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchemaTest.java @@ -231,7 +231,7 @@ public class KvRowDataDeserializationSchemaTest { "f1=10&f2=aa&f3=bb&f4=cc".getBytes(StandardCharsets.UTF_16)); } - @Test(expected = Exception.class) + @Test public void testErrors() throws Exception { Consumer<KvRowDataDeserializationSchema.Builder> config = builder -> { };