yuxiqian commented on code in PR #3558:
URL: https://github.com/apache/flink-cdc/pull/3558#discussion_r1723028372


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java:
##########
@@ -221,7 +221,7 @@ private void applyAddColumnEvent(AddColumnEvent event) 
throws SchemaEvolveExcept
                         tableId.getSchemaName(), tableId.getTableName(), 
addFieldSchema);
             }
         } catch (Exception e) {
-            throw new SchemaEvolveException(event, e.getMessage(), e);
+            throw new SchemaEvolveException(event, "fail to apply add column 
event", e);

Review Comment:
   ```suggestion
               throw new SchemaEvolveException(event, "Failed to apply add 
column event", e);
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java:
##########
@@ -437,6 +444,25 @@ private List<SchemaChangeEvent> 
lenientizeSchemaChangeEvent(SchemaChangeEvent ev
         }
     }
 
+    private boolean shouldIgnoreException(Exception exception) {
+
+        // only UnsupportedSchemaChangeEventException maybe ignore(depends on 
SchemaChangeBehavior)

Review Comment:
   Code flow and comments are confusing here. Something like
   
   ```java
   // Only `UnsupportedSchemaChangeEventException` could be ignored in 
TRY_EVOLVE mode.
   // In IGNORE mode, will never try to apply schema change events
   // In EVOLVE mode, such failure will not be tolerated
   // In EXCEPTION mode, an exception will be thrown once captured
   return (exception instanceof UnsupportedSchemaChangeEventException) && 
       (schemaChangeBehavior == TRY_EVOLVE)
   ```
   
   might be enough.



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java:
##########
@@ -1040,8 +1040,10 @@ tableId, buildRecord(INT, 2, STRING, "Bob", SMALLINT, 
(short) 18)),
                                                 Column.physicalColumn(
                                                         "height", DOUBLE, 
"Height data")))));
         Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, 
addColumnEvents))
+                .cause()
+                .cause()
                 .isExactlyInstanceOf(RuntimeException.class)
-                .hasMessageContaining("Failed to apply schema change");
+                .hasMessageContaining("failed to apply schema change");

Review Comment:
   Use `hasRootCauseInstanceOf` and `hasRootCauseMessage`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to