libenchao commented on code in PR #25919:
URL: https://github.com/apache/flink/pull/25919#discussion_r1928378346


##########
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java:
##########
@@ -214,6 +215,7 @@ public void deserialize(@Nullable byte[] message, 
Collector<RowData> out) throws
         if (message == null || message.length == 0) {
             return;
         }
+        List<GenericRowData> genericRowDataList = new ArrayList<>();

Review Comment:
   We can add a transient field to `CacalJsonDeserializationSchema` to avoid 
creating new list for every record, it's more efficient, WDYT?



##########
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java:
##########
@@ -73,10 +76,69 @@ void testFilteringTables() throws Exception {
                                 
InternalTypeInfo.of(PHYSICAL_DATA_TYPE.getLogicalType()))
                         .setDatabase("^my.*")
                         .setTable("^prod.*")
+                        .setIgnoreParseErrors(true)

Review Comment:
   What's the purpose of enabling ignore-parse-error for this test case?



##########
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java:
##########
@@ -319,4 +381,22 @@ public void close() {
             // do nothing
         }
     }
+
+    private static class SpecialCollector implements Collector<RowData> {

Review Comment:
   How about naming it `ThrowingExceptionCollector`?



##########
flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java:
##########
@@ -319,4 +381,22 @@ public void close() {
             // do nothing
         }
     }
+
+    private static class SpecialCollector implements Collector<RowData> {
+
+        private final List<RowData> list = new ArrayList<>();
+
+        @Override
+        public void collect(RowData record) {
+            if (record.getRowKind().equals(RowKind.DELETE) && record.getInt(0) 
== 103) {

Review Comment:
   The criteria is too specific, could we just throw exception directly no 
matter what the data is? We can only test the case that when exception occurs 
in chained downstream operator, the exception is correctly thrown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to