Copilot commented on code in PR #4334:
URL: https://github.com/apache/flink-cdc/pull/4334#discussion_r2982075761
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java:
##########
@@ -1179,6 +1186,7 @@ public void execute(
// Only when we reach the first BEGIN event will we start to skip
events ...
skipEvent = false;
+ Throwable executionError = null;
Review Comment:
If an exception occurs in the main `try` and any `unregister*` call throws
in `finally`, the cleanup exception will replace the original failure, making
the root cause harder to diagnose. Consider using the `executionError` variable
to preserve the primary exception: record the original throwable, then wrap
`unregister*` in a try/catch and attach cleanup failures via `addSuppressed`
(or only throw cleanup failures when there was no primary error).
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java:
##########
@@ -1264,6 +1272,13 @@ public void execute(
} catch (Exception e) {
LOGGER.info("Exception while stopping binary log client", e);
}
+
+ client.unregisterEventListener(eventListener);
+ client.unregisterEventListener(metricsEventListener);
+ client.unregisterLifecycleListener(lifecycleListener);
+ if (logEventListener != null) {
+ client.unregisterEventListener(logEventListener);
Review Comment:
If an exception occurs in the main `try` and any `unregister*` call throws
in `finally`, the cleanup exception will replace the original failure, making
the root cause harder to diagnose. Consider using the `executionError` variable
to preserve the primary exception: record the original throwable, then wrap
`unregister*` in a try/catch and attach cleanup failures via `addSuppressed`
(or only throw cleanup failures when there was no primary error).
```suggestion
try {
client.unregisterEventListener(eventListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering event listener",
e);
}
try {
client.unregisterEventListener(metricsEventListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering metrics event
listener", e);
}
try {
client.unregisterLifecycleListener(lifecycleListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering lifecycle
listener", e);
}
if (logEventListener != null) {
try {
client.unregisterEventListener(logEventListener);
} catch (Exception e) {
LOGGER.info("Exception while unregistering log event
listener", e);
}
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java:
##########
@@ -586,6 +586,21 @@ void testMultipleSplitsWithBackfill() throws Exception {
"UPDATE " + tableId + " SET address =
'Beijing' WHERE id = 103");
mySqlConnection.commit();
} else if (split.splitId().equals(tableId + ":1")) {
+ // To verify that FLINK-39315 is fixed, generate
sufficient binlog events,
+ // so that the MySqlBinlogSplitReadTask runs long
enough to exercise the
+ // context-running checks in binlog reading backfill
phase.
+ for (int i = 0; i < 1000; i++) {
+ mySqlConnection.execute(
+ "UPDATE "
+ + tableId
+ + " SET address = 'Beijing' WHERE
id = 106");
+ mySqlConnection.commit();
+ mySqlConnection.execute(
+ "UPDATE "
+ + tableId
+ + " SET address = 'Shanghai' WHERE
id = 106");
+ mySqlConnection.commit();
+ }
Review Comment:
This adds 2000 updates with 2000 commits, which can significantly slow down
the unit test and increase flakiness on slower CI environments. Consider
batching commits (e.g., commit every N updates or run the loop in a single
transaction), reducing iterations to the minimum that still reproduces the
issue, or deriving the iteration count from a constant/system property so CI
can tune it if needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]