libenchao commented on code in PR #25919: URL: https://github.com/apache/flink/pull/25919#discussion_r1928378346
########## flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java: ########## @@ -214,6 +215,7 @@ public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws if (message == null || message.length == 0) { return; } + List<GenericRowData> genericRowDataList = new ArrayList<>(); Review Comment: We can add a transient field to `CacalJsonDeserializationSchema` to avoid creating new list for every record, it's more efficient, WDYT? ########## flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java: ########## @@ -73,10 +76,69 @@ void testFilteringTables() throws Exception { InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType())) .setDatabase("^my.*") .setTable("^prod.*") + .setIgnoreParseErrors(true) Review Comment: What's the purpose of enabling ignore-parse-error for this test case? ########## flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java: ########## @@ -319,4 +381,22 @@ public void close() { // do nothing } } + + private static class SpecialCollector implements Collector<RowData> { Review Comment: How about naming it `ThrowingExceptionCollector`? ########## flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java: ########## @@ -319,4 +381,22 @@ public void close() { // do nothing } } + + private static class SpecialCollector implements Collector<RowData> { + + private final List<RowData> list = new ArrayList<>(); + + @Override + public void collect(RowData record) { + if (record.getRowKind().equals(RowKind.DELETE) && record.getInt(0) == 103) { Review Comment: The criteria is too specific, could we just throw exception directly no matter what the data is? We can only test the case that when exception occurs in chained downstream operator, the exception is correctly thrown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org