ruanhang1993 commented on code in PR #3463:
URL: https://github.com/apache/flink-cdc/pull/3463#discussion_r1914284429


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java:
##########
@@ -180,6 +199,7 @@ public void close() {
                 statefulTaskContext.getBinaryLogClient().disconnect();
             }

Review Comment:
   We need to delete the previous close invokes for statefulTaskContext.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java:
##########
@@ -86,6 +92,19 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
 
     private static final long READER_CLOSE_TIMEOUT = 30L;
 
+    public BinlogSplitReader(MySqlSourceConfig sourceConfig, int subTaskId) {
+        final MySqlConnection jdbcConnection = 
createMySqlConnection(sourceConfig);
+        final BinaryLogClient binaryLogClient =
+                createBinaryClient(sourceConfig.getDbzConfiguration());
+        this.statefulTaskContext =
+                new StatefulTaskContext(sourceConfig, binaryLogClient, 
jdbcConnection);
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder().setNameFormat("binlog-reader-" + 
subTaskId).build();
+        this.executorService = 
Executors.newSingleThreadExecutor(threadFactory);
+        this.currentTaskRunning = true;
+        this.pureBinlogPhaseTables = new HashSet<>();

Review Comment:
   ```suggestion
           this(new StatefulTaskContext(...), subtaskId);
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java:
##########
@@ -180,6 +199,7 @@ public void close() {
                 statefulTaskContext.getBinaryLogClient().disconnect();
             }
 
+            statefulTaskContext.close();

Review Comment:
   ```suggestion
   if (statefulTaskContext != null) {
               statefulTaskContext.close();
   }
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java:
##########
@@ -86,6 +92,19 @@ public class BinlogSplitReader implements 
DebeziumReader<SourceRecords, MySqlSpl
 
     private static final long READER_CLOSE_TIMEOUT = 30L;
 
+    public BinlogSplitReader(MySqlSourceConfig sourceConfig, int subTaskId) {

Review Comment:
   subTaskId => subtaskId



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java:
##########
@@ -378,15 +403,7 @@ private void setReadException(Throwable throwable) {
     public void close() {
         try {
             stopCurrentTask();
-            if (statefulTaskContext.getConnection() != null) {
-                statefulTaskContext.getConnection().close();
-            }
-            if (statefulTaskContext.getBinaryLogClient() != null) {
-                statefulTaskContext.getBinaryLogClient().disconnect();
-            }
-            if (statefulTaskContext.getDatabaseSchema() != null) {
-                statefulTaskContext.getDatabaseSchema().close();
-            }
+            statefulTaskContext.close();

Review Comment:
   ```suggestion
   if (statefulTaskContext != null) {
               statefulTaskContext.close();
               }
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java:
##########
@@ -93,6 +99,26 @@ public class SnapshotSplitReader implements 
DebeziumReader<SourceRecords, MySqlS
 
     private static final long READER_CLOSE_TIMEOUT = 30L;
 
+    public SnapshotSplitReader(
+            MySqlSourceConfig sourceConfig, int subtaskId, SnapshotPhaseHooks 
hooks) {
+        final MySqlConnection jdbcConnection = 
createMySqlConnection(sourceConfig);
+        final BinaryLogClient binaryLogClient =
+                createBinaryClient(sourceConfig.getDbzConfiguration());
+        this.statefulTaskContext =
+                new StatefulTaskContext(sourceConfig, binaryLogClient, 
jdbcConnection);
+        ThreadFactory threadFactory =
+                new ThreadFactoryBuilder()
+                        .setNameFormat("debezium-reader-" + subtaskId)
+                        .setUncaughtExceptionHandler(
+                                (thread, throwable) -> 
setReadException(throwable))
+                        .build();
+        this.executorService = 
Executors.newSingleThreadExecutor(threadFactory);
+        this.hooks = hooks;
+        this.currentTaskRunning = false;
+        this.hasNextElement = new AtomicBoolean(false);
+        this.reachEnd = new AtomicBoolean(false);

Review Comment:
   ```suggestion
           this(new StatefulTaskContext(...), subtaskId, hooks);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to