ruanhang1993 commented on code in PR #3463: URL: https://github.com/apache/flink-cdc/pull/3463#discussion_r1914284429
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java: ########## @@ -180,6 +199,7 @@ public void close() { statefulTaskContext.getBinaryLogClient().disconnect(); } Review Comment: We need to delete the previous close invokes for statefulTaskContext. ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java: ########## @@ -86,6 +92,19 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl private static final long READER_CLOSE_TIMEOUT = 30L; + public BinlogSplitReader(MySqlSourceConfig sourceConfig, int subTaskId) { + final MySqlConnection jdbcConnection = createMySqlConnection(sourceConfig); + final BinaryLogClient binaryLogClient = + createBinaryClient(sourceConfig.getDbzConfiguration()); + this.statefulTaskContext = + new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection); + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setNameFormat("binlog-reader-" + subTaskId).build(); + this.executorService = Executors.newSingleThreadExecutor(threadFactory); + this.currentTaskRunning = true; + this.pureBinlogPhaseTables = new HashSet<>(); Review Comment: ```suggestion this(new StatefulTaskContext(...), subtaskId); ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java: ########## @@ -180,6 +199,7 @@ public void close() { statefulTaskContext.getBinaryLogClient().disconnect(); } + statefulTaskContext.close(); Review Comment: ```suggestion if (statefulTaskContext != null) { statefulTaskContext.close(); } ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java: ########## @@ -86,6 +92,19 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl private static final long READER_CLOSE_TIMEOUT = 30L; + public BinlogSplitReader(MySqlSourceConfig sourceConfig, int subTaskId) { Review Comment: subTaskId => subtaskId ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java: ########## @@ -378,15 +403,7 @@ private void setReadException(Throwable throwable) { public void close() { try { stopCurrentTask(); - if (statefulTaskContext.getConnection() != null) { - statefulTaskContext.getConnection().close(); - } - if (statefulTaskContext.getBinaryLogClient() != null) { - statefulTaskContext.getBinaryLogClient().disconnect(); - } - if (statefulTaskContext.getDatabaseSchema() != null) { - statefulTaskContext.getDatabaseSchema().close(); - } + statefulTaskContext.close(); Review Comment: ```suggestion if (statefulTaskContext != null) { statefulTaskContext.close(); } ``` ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java: ########## @@ -93,6 +99,26 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecords, MySqlS private static final long READER_CLOSE_TIMEOUT = 30L; + public SnapshotSplitReader( + MySqlSourceConfig sourceConfig, int subtaskId, SnapshotPhaseHooks hooks) { + final MySqlConnection jdbcConnection = createMySqlConnection(sourceConfig); + final BinaryLogClient binaryLogClient = + createBinaryClient(sourceConfig.getDbzConfiguration()); + this.statefulTaskContext = + new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection); + ThreadFactory threadFactory = + new ThreadFactoryBuilder() + .setNameFormat("debezium-reader-" + subtaskId) + .setUncaughtExceptionHandler( + (thread, throwable) -> setReadException(throwable)) + .build(); + this.executorService = Executors.newSingleThreadExecutor(threadFactory); + this.hooks = hooks; + this.currentTaskRunning = false; + this.hasNextElement = new AtomicBoolean(false); + this.reachEnd = new AtomicBoolean(false); Review Comment: ```suggestion this(new StatefulTaskContext(...), subtaskId, hooks); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org