This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3287b1d85 [Bugfix][connector-cdc-mysql] Fix listener not released when 
BinlogClient reuse (#5011)
3287b1d85 is described below

commit 3287b1d8521b9fda08fa4774cf958071031ac313
Author: happyboy1024 <137260654+happyboy1...@users.noreply.github.com>
AuthorDate: Tue Jul 11 16:28:01 2023 +0800

    [Bugfix][connector-cdc-mysql] Fix listener not released when BinlogClient 
reuse (#5011)
---
 .../reader/fetch/MySqlSourceFetchTaskContext.java      | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index c6aebc8aa..0b6ea40ea 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -18,6 +18,7 @@
 package 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
@@ -70,6 +71,7 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
@@ -326,13 +328,27 @@ public class MySqlSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
                 MySqlDatabaseSchema schema,
                 BinaryLogClient reusedBinaryLogClient) {
             super(config, schema);
-            this.reusedBinaryLogClient = reusedBinaryLogClient;
+            this.reusedBinaryLogClient = 
resetBinaryLogClient(reusedBinaryLogClient);
         }
 
         @Override
         public BinaryLogClient getBinaryLogClient() {
             return reusedBinaryLogClient;
         }
+
+        /** reset the listener of binaryLogClient before fetch task start. */
+        private BinaryLogClient resetBinaryLogClient(BinaryLogClient 
binaryLogClient) {
+            Optional<Object> eventListenersField =
+                    ReflectionUtils.getField(
+                            binaryLogClient, BinaryLogClient.class, 
"eventListeners");
+            eventListenersField.ifPresent(o -> 
((List<BinaryLogClient.EventListener>) o).clear());
+            Optional<Object> lifecycleListeners =
+                    ReflectionUtils.getField(
+                            binaryLogClient, BinaryLogClient.class, 
"lifecycleListeners");
+            lifecycleListeners.ifPresent(
+                    o -> ((List<BinaryLogClient.LifecycleListener>) 
o).clear());
+            return binaryLogClient;
+        }
     }
 
     /** Copied from debezium for accessing here. */

Reply via email to