Copilot commented on code in PR #4334:
URL: https://github.com/apache/flink-cdc/pull/4334#discussion_r2982075761


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java:
##########
@@ -1179,6 +1186,7 @@ public void execute(
         // Only when we reach the first BEGIN event will we start to skip 
events ...
         skipEvent = false;
 
+        Throwable executionError = null;

Review Comment:
   If an exception occurs in the main `try` and any `unregister*` call throws 
in `finally`, the cleanup exception will replace the original failure, making 
the root cause harder to diagnose. Consider using the `executionError` variable 
to preserve the primary exception: record the original throwable, then wrap 
`unregister*` in a try/catch and attach cleanup failures via `addSuppressed` 
(or only throw cleanup failures when there was no primary error).



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java:
##########
@@ -1264,6 +1272,13 @@ public void execute(
             } catch (Exception e) {
                 LOGGER.info("Exception while stopping binary log client", e);
             }
+
+            client.unregisterEventListener(eventListener);
+            client.unregisterEventListener(metricsEventListener);
+            client.unregisterLifecycleListener(lifecycleListener);
+            if (logEventListener != null) {
+                client.unregisterEventListener(logEventListener);

Review Comment:
   If an exception occurs in the main `try` and any `unregister*` call throws 
in `finally`, the cleanup exception will replace the original failure, making 
the root cause harder to diagnose. Consider using the `executionError` variable 
to preserve the primary exception: record the original throwable, then wrap 
`unregister*` in a try/catch and attach cleanup failures via `addSuppressed` 
(or only throw cleanup failures when there was no primary error).
   ```suggestion
               try {
                   client.unregisterEventListener(eventListener);
               } catch (Exception e) {
                   LOGGER.info("Exception while unregistering event listener", 
e);
               }
               try {
                   client.unregisterEventListener(metricsEventListener);
               } catch (Exception e) {
                   LOGGER.info("Exception while unregistering metrics event 
listener", e);
               }
               try {
                   client.unregisterLifecycleListener(lifecycleListener);
               } catch (Exception e) {
                   LOGGER.info("Exception while unregistering lifecycle 
listener", e);
               }
               if (logEventListener != null) {
                   try {
                       client.unregisterEventListener(logEventListener);
                   } catch (Exception e) {
                       LOGGER.info("Exception while unregistering log event 
listener", e);
                   }
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java:
##########
@@ -586,6 +586,21 @@ void testMultipleSplitsWithBackfill() throws Exception {
                                 "UPDATE " + tableId + " SET address = 
'Beijing' WHERE id = 103");
                         mySqlConnection.commit();
                     } else if (split.splitId().equals(tableId + ":1")) {
+                        // To verify that FLINK-39315 is fixed, generate 
sufficient binlog events,
+                        // so that the MySqlBinlogSplitReadTask runs long 
enough to exercise the
+                        // context-running checks in binlog reading backfill 
phase.
+                        for (int i = 0; i < 1000; i++) {
+                            mySqlConnection.execute(
+                                    "UPDATE "
+                                            + tableId
+                                            + " SET address = 'Beijing' WHERE 
id = 106");
+                            mySqlConnection.commit();
+                            mySqlConnection.execute(
+                                    "UPDATE "
+                                            + tableId
+                                            + " SET address = 'Shanghai' WHERE 
id = 106");
+                            mySqlConnection.commit();
+                        }

Review Comment:
   This adds 2000 updates with 2000 commits, which can significantly slow down 
the unit test and increase flakiness on slower CI environments. Consider 
batching commits (e.g., commit every N updates or run the loop in a single 
transaction), reducing iterations to the minimum that still reproduces the 
issue, or deriving the iteration count from a constant/system property so CI 
can tune it if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to