This is an automated email from the ASF dual-hosted git repository. gaojun2048 pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 3287b1d85 [Bugfix][connector-cdc-mysql] Fix listener not released when BinlogClient reuse (#5011) 3287b1d85 is described below commit 3287b1d8521b9fda08fa4774cf958071031ac313 Author: happyboy1024 <137260654+happyboy1...@users.noreply.github.com> AuthorDate: Tue Jul 11 16:28:01 2023 +0800 [Bugfix][connector-cdc-mysql] Fix listener not released when BinlogClient reuse (#5011) --- .../reader/fetch/MySqlSourceFetchTaskContext.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java index c6aebc8aa..0b6ea40ea 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.ReflectionUtils; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; @@ -70,6 +71,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY; import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient; @@ -326,13 +328,27 @@ public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext { MySqlDatabaseSchema schema, BinaryLogClient reusedBinaryLogClient) { super(config, schema); - this.reusedBinaryLogClient = reusedBinaryLogClient; + this.reusedBinaryLogClient = resetBinaryLogClient(reusedBinaryLogClient); } @Override public BinaryLogClient getBinaryLogClient() { return reusedBinaryLogClient; } + + /** reset the listener of binaryLogClient before fetch task start. */ + private BinaryLogClient resetBinaryLogClient(BinaryLogClient binaryLogClient) { + Optional<Object> eventListenersField = + ReflectionUtils.getField( + binaryLogClient, BinaryLogClient.class, "eventListeners"); + eventListenersField.ifPresent(o -> ((List<BinaryLogClient.EventListener>) o).clear()); + Optional<Object> lifecycleListeners = + ReflectionUtils.getField( + binaryLogClient, BinaryLogClient.class, "lifecycleListeners"); + lifecycleListeners.ifPresent( + o -> ((List<BinaryLogClient.LifecycleListener>) o).clear()); + return binaryLogClient; + } } /** Copied from debezium for accessing here. */