This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ec2f4c02cf8 Add more test cases on MySQLBinlogClientTest (#37435)
ec2f4c02cf8 is described below

commit ec2f4c02cf8dd31d0dfe967fa03c4abeef269f3e
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 19 13:10:38 2025 +0800

    Add more test cases on MySQLBinlogClientTest (#37435)
    
    * Add more test cases on MySQLBinlogClientTest
    
    * Add more test cases on MySQLBinlogClientTest
    
    * Add more test cases on MySQLBinlogClientTest
---
 .../incremental/client/MySQLBinlogClient.java      |   9 +-
 .../incremental/client/MySQLBinlogClientTest.java  | 420 ++++++++++++++++++---
 2 files changed, 378 insertions(+), 51 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
index 2db6ee0c6e8..08a5b30aed8 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
@@ -49,6 +49,7 @@ import 
org.apache.shardingsphere.database.protocol.mysql.packet.command.binlog.M
 import 
org.apache.shardingsphere.database.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
 import 
org.apache.shardingsphere.database.protocol.mysql.packet.generic.MySQLErrPacket;
 import 
org.apache.shardingsphere.database.protocol.mysql.packet.generic.MySQLOKPacket;
+import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
 import org.apache.shardingsphere.infra.util.json.JsonUtils;
 import org.apache.shardingsphere.proxy.frontend.netty.ChannelAttrInitializer;
@@ -143,9 +144,7 @@ public final class MySQLBinlogClient {
         resetSequenceID();
         channel.writeAndFlush(comQueryPacket);
         Optional<MySQLOKPacket> packet = 
waitExpectedResponse(MySQLOKPacket.class);
-        if (!packet.isPresent()) {
-            throw new PipelineInternalException("Could not get MySQL OK 
packet");
-        }
+        ShardingSpherePreconditions.checkState(packet.isPresent(), () -> new 
PipelineInternalException("Could not get MySQL OK packet"));
         return (int) packet.get().getAffectedRows();
     }
     
@@ -162,9 +161,7 @@ public final class MySQLBinlogClient {
         resetSequenceID();
         channel.writeAndFlush(comQueryPacket);
         Optional<InternalResultSet> result = 
waitExpectedResponse(InternalResultSet.class);
-        if (!result.isPresent()) {
-            throw new PipelineInternalException("Could not get MySQL 
FieldCount/ColumnDefinition/TextResultSetRow packet");
-        }
+        ShardingSpherePreconditions.checkState(result.isPresent(), () -> new 
PipelineInternalException("Could not get MySQL 
FieldCount/ColumnDefinition/TextResultSetRow packet"));
         return result.get();
     }
     
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
index a5906cc71c5..ac76e6eeb8a 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
@@ -17,40 +17,82 @@
 
 package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client;
 
+import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.Attribute;
+import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Promise;
 import lombok.SneakyThrows;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLBinlogEventPacketDecoder;
 import 
org.apache.shardingsphere.database.protocol.mysql.constant.MySQLConstants;
 import 
org.apache.shardingsphere.database.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
 import 
org.apache.shardingsphere.database.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket;
+import 
org.apache.shardingsphere.database.protocol.mysql.packet.command.query.text.MySQLTextResultSetRowPacket;
 import 
org.apache.shardingsphere.database.protocol.mysql.packet.command.query.text.query.MySQLComQueryPacket;
+import 
org.apache.shardingsphere.database.protocol.mysql.packet.generic.MySQLErrPacket;
 import 
org.apache.shardingsphere.database.protocol.mysql.packet.generic.MySQLOKPacket;
+import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentMatchers;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
+import org.mockito.MockedConstruction;
 import org.mockito.internal.configuration.plugins.Plugins;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 import java.net.InetSocketAddress;
+import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
 
+@SuppressWarnings("ProhibitedExceptionDeclared")
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
 class MySQLBinlogClientTest {
@@ -64,69 +106,364 @@ class MySQLBinlogClientTest {
     @Mock
     private ChannelFuture channelFuture;
     
+    private final ConnectInfo connectInfo = new ConnectInfo(1, "host", 3306, 
"username", "password");
+    
     private MySQLBinlogClient client;
     
-    @SuppressWarnings("unchecked")
+    private final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
+    
     @BeforeEach
     void setUp() {
-        client = new MySQLBinlogClient(new ConnectInfo(1, "host", 3306, 
"username", "password"), false);
+        client = new MySQLBinlogClient(connectInfo, false);
         when(channel.pipeline()).thenReturn(pipeline);
         when(channel.isOpen()).thenReturn(true);
         when(channel.close()).thenReturn(channelFuture);
         when(channel.localAddress()).thenReturn(new InetSocketAddress("host", 
3306));
         
when(channel.attr(MySQLConstants.SEQUENCE_ID_ATTRIBUTE_KEY)).thenReturn(mock(Attribute.class));
         
when(channel.attr(MySQLConstants.SEQUENCE_ID_ATTRIBUTE_KEY).get()).thenReturn(new
 AtomicInteger());
+        when(pipeline.addLast(any(ChannelHandler.class))).thenReturn(pipeline);
+        when(pipeline.fireChannelRegistered()).thenReturn(pipeline);
+    }
+    
+    @AfterEach
+    void tearDown() {
+        eventLoopGroup.shutdownGracefully();
+        Thread.interrupted();
     }
     
     @Test
-    void assertConnect() throws ReflectiveOperationException {
+    void assertConnect() throws Exception {
         MySQLServerVersion expected = new MySQLServerVersion("5.5.0-log");
+        AtomicReference<ChannelInitializer<SocketChannel>> initializer = new 
AtomicReference<>();
         mockChannelResponse(expected);
-        client.connect();
+        try (MockedConstruction<Bootstrap> ignored = 
mockConstruction(Bootstrap.class, (mock, context) -> {
+            when(mock.group(any())).thenReturn(mock);
+            when(mock.channel(any())).thenReturn(mock);
+            when(mock.option(any(), any())).thenReturn(mock);
+            when(mock.handler(any())).thenAnswer(invocation -> {
+                initializer.set(invocation.getArgument(0));
+                return mock;
+            });
+            when(mock.connect(anyString(), 
anyInt())).thenReturn(channelFuture);
+        })) {
+            client.connect();
+        }
+        ChannelHandlerContext context = mock(ChannelHandlerContext.class, 
RETURNS_DEEP_STUBS);
+        when(context.executor()).thenReturn(eventLoopGroup.next());
+        when(context.pipeline()).thenReturn(pipeline);
+        initializer.get().channelRegistered(context);
         MySQLServerVersion actual = (MySQLServerVersion) 
Plugins.getMemberAccessor().get(MySQLBinlogClient.class.getDeclaredField("serverVersion"),
 client);
         assertThat(actual, is(expected));
+        client.closeChannel();
     }
     
     @Test
-    void assertExecute() throws ReflectiveOperationException {
+    void assertExecuteSuccess() {
+        prepareClientChannel();
         mockChannelResponse(new MySQLOKPacket(0));
-        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("channel"),
 client, channel);
-        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("eventLoopGroup"),
 client, new NioEventLoopGroup(1));
-        assertTrue(client.execute(""));
-        
verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComQueryPacket.class));
+        assertTrue(client.execute("sql"));
+        verify(channel).writeAndFlush(any(MySQLComQueryPacket.class));
     }
     
     @Test
-    void assertExecuteUpdate() throws ReflectiveOperationException {
-        MySQLOKPacket expected = new MySQLOKPacket(10L, 0L, 0);
-        
Plugins.getMemberAccessor().set(MySQLOKPacket.class.getDeclaredField("affectedRows"),
 expected, 10L);
-        mockChannelResponse(expected);
-        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("channel"),
 client, channel);
-        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("eventLoopGroup"),
 client, new NioEventLoopGroup(1));
-        assertThat(client.executeUpdate(""), is(10));
-        
verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComQueryPacket.class));
+    void assertNullResponseHandling() {
+        prepareClientChannel();
+        mockChannelResponse(null);
+        assertFalse(client.execute("sql"));
+        PipelineInternalException updateException = 
assertThrows(PipelineInternalException.class, () -> 
client.executeUpdate("update"));
+        assertThat(updateException.getMessage(), is("Could not get MySQL OK 
packet"));
+        PipelineInternalException queryException = 
assertThrows(PipelineInternalException.class, () -> 
client.executeQuery("query"));
+        assertThat(queryException.getMessage(), is("Could not get MySQL 
FieldCount/ColumnDefinition/TextResultSetRow packet"));
+    }
+    
+    @Test
+    void assertExecuteThrowsWhenErrorResponse() {
+        prepareClientChannel();
+        mockChannelResponse(new MySQLErrPacket(new SQLException("err", 
"state", 1)));
+        PipelineInternalException ex = 
assertThrows(PipelineInternalException.class, () -> client.execute("sql"));
+        assertThat(ex.getMessage(), is("err"));
+    }
+    
+    @Test
+    void assertExecuteQueryThrowsWhenUnexpectedResponse() {
+        prepareClientChannel();
+        mockChannelResponse(new Object());
+        PipelineInternalException ex = 
assertThrows(PipelineInternalException.class, () -> 
client.executeQuery("query"));
+        assertThat(ex.getMessage(), is("unexpected response type"));
     }
     
     @Test
-    void assertExecuteQuery() throws ReflectiveOperationException {
+    void assertExecuteUpdateReturnsAffectedRows() {
+        prepareClientChannel();
+        doAnswer(invocation -> {
+            @SuppressWarnings("unchecked")
+            Promise<Object> callback = (Promise<Object>) 
Plugins.getMemberAccessor().get(MySQLBinlogClient.class.getDeclaredField("responseCallback"),
 client);
+            callback.setSuccess(new MySQLOKPacket(5L, 0L, 0));
+            return null;
+        }).when(channel).writeAndFlush(any(MySQLComQueryPacket.class));
+        assertThat(client.executeUpdate("update"), is(5));
+    }
+    
+    @Test
+    void assertExecuteQueryThrowsWhenInterrupted() throws Exception {
+        prepareClientChannel();
+        Promise<Object> interruptedPromise = mock(Promise.class);
+        when(interruptedPromise.get(5L, TimeUnit.SECONDS)).thenThrow(new 
InterruptedException());
+        doAnswer(invocation -> {
+            setResponseCallback(interruptedPromise);
+            return null;
+        }).when(channel).writeAndFlush(any(MySQLComQueryPacket.class));
+        assertThrows(PipelineInternalException.class, () -> 
client.executeQuery("query"));
+        assertTrue(Thread.currentThread().isInterrupted());
+    }
+    
+    @Test
+    void assertExecuteThrowsWhenTimeout() throws Exception {
+        prepareClientChannel();
+        Promise<Object> timeoutPromise = mock(Promise.class);
+        when(timeoutPromise.get(5L, TimeUnit.SECONDS)).thenThrow(new 
TimeoutException());
+        doAnswer(invocation -> {
+            setResponseCallback(timeoutPromise);
+            return null;
+        }).when(channel).writeAndFlush(any(MySQLComQueryPacket.class));
+        PipelineInternalException actual = 
assertThrows(PipelineInternalException.class, () -> client.execute("sql"));
+        assertThat(actual.getCause(), isA(TimeoutException.class));
+    }
+    
+    @Test
+    void assertExecuteQueryReturnsResultSet() {
+        prepareClientChannel();
         InternalResultSet expected = new InternalResultSet(null);
-        mockChannelResponse(expected);
+        doAnswer(invocation -> {
+            @SuppressWarnings("unchecked")
+            Promise<Object> callback = (Promise<Object>) 
Plugins.getMemberAccessor().get(MySQLBinlogClient.class.getDeclaredField("responseCallback"),
 client);
+            callback.setSuccess(expected);
+            return null;
+        }).when(channel).writeAndFlush(any(MySQLComQueryPacket.class));
+        assertThat(client.executeQuery("select"), is(expected));
+    }
+    
+    @Test
+    void assertSubscribeChecksumLengthBranches() {
+        client = createClientMock();
+        prepareClientChannel();
+        setServerVersion("5.6.0");
+        doReturn(true).when(client).execute(anyString());
+        
doReturn(createResultSet("NONE")).doReturn(createResultSet("CRC32")).when(client).executeQuery(anyString());
+        
when(channel.writeAndFlush(any(MySQLComRegisterSlaveCommandPacket.class))).thenAnswer(invocation
 -> {
+            Promise<Object> callback = new 
DefaultPromise<>(eventLoopGroup.next());
+            callback.setSuccess(new MySQLOKPacket(0));
+            setResponseCallback(callback);
+            return null;
+        });
+        doAnswer(invocation -> 
null).when(channel).writeAndFlush(any(MySQLComBinlogDumpCommandPacket.class));
+        client.subscribe("binlog-000001", 4L);
+        client.subscribe("binlog-000002", 8L);
+        ArgumentCaptor<ChannelHandler> captor = 
ArgumentCaptor.forClass(ChannelHandler.class);
+        verify(pipeline, times(4)).addLast(captor.capture());
+        List<Integer> checksumLengths = captor.getAllValues().stream()
+                .filter(each -> each instanceof MySQLBinlogEventPacketDecoder)
+                .map(each -> getChecksumLength((MySQLBinlogEventPacketDecoder) 
each))
+                .collect(Collectors.toList());
+        assertThat(checksumLengths, is(Arrays.asList(0, 4)));
+    }
+    
+    @Test
+    void assertSubscribeUnsupportedChecksumThrows() {
+        client = createClientMock();
+        prepareClientChannel();
+        setServerVersion("5.6.0");
+        doReturn(true).when(client).execute(anyString());
+        
doReturn(createResultSet("SHA1")).when(client).executeQuery(anyString());
+        
when(channel.writeAndFlush(any(MySQLComRegisterSlaveCommandPacket.class))).thenAnswer(invocation
 -> {
+            Promise<Object> callback = new 
DefaultPromise<>(eventLoopGroup.next());
+            callback.setSuccess(new MySQLOKPacket(0));
+            setResponseCallback(callback);
+            return null;
+        });
+        assertThrows(UnsupportedSQLOperationException.class, () -> 
client.subscribe("binlog", 4L));
+    }
+    
+    @Test
+    void assertSubscribeBelow56UsesZeroChecksum() {
+        client = createClientMock();
+        prepareClientChannel();
+        setServerVersion("5.5.0");
+        doReturn(true).when(client).execute(anyString());
+        
when(channel.writeAndFlush(any(MySQLComRegisterSlaveCommandPacket.class))).thenAnswer(invocation
 -> {
+            Promise<Object> callback = new 
DefaultPromise<>(eventLoopGroup.next());
+            callback.setSuccess(new MySQLOKPacket(0));
+            setResponseCallback(callback);
+            return null;
+        });
+        doAnswer(invocation -> 
null).when(channel).writeAndFlush(any(MySQLComBinlogDumpCommandPacket.class));
+        client.subscribe("binlog-000004", 16L);
+        ArgumentCaptor<ChannelHandler> captor = 
ArgumentCaptor.forClass(ChannelHandler.class);
+        verify(pipeline, times(2)).addLast(captor.capture());
+        MySQLBinlogEventPacketDecoder decoder = captor.getAllValues().stream()
+                .filter(each -> each instanceof MySQLBinlogEventPacketDecoder)
+                
.map(MySQLBinlogEventPacketDecoder.class::cast).findFirst().orElseThrow(IllegalStateException::new);
+        assertThat(getChecksumLength(decoder), is(0));
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    void assertPollBranches() throws InterruptedException, 
ReflectiveOperationException {
+        setRunning(false);
+        assertThat(client.poll(), is(Collections.emptyList()));
+        setRunning(true);
+        assertThat(client.poll(), is(Collections.emptyList()));
+        List<MySQLBaseBinlogEvent> events = Collections.singletonList(new 
PlaceholderBinlogEvent("binlog", 4L, 1L));
+        ((ArrayBlockingQueue<List<MySQLBaseBinlogEvent>>) 
Plugins.getMemberAccessor().get(MySQLBinlogClient.class.getDeclaredField("blockingEventQueue"),
 client)).put(events);
+        assertThat(client.poll(), is(events));
+        setRunning(true);
+        Thread.currentThread().interrupt();
+        assertThat(client.poll(), is(Collections.emptyList()));
+    }
+    
+    @Test
+    void assertCloseChannelWhenChannelUnavailable() {
+        assertFalse(client.closeChannel().isPresent());
+    }
+    
+    @Test
+    void assertCloseChannelWithoutEventLoopGroup() throws Exception {
         
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("channel"),
 client, channel);
-        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("eventLoopGroup"),
 client, new NioEventLoopGroup(1));
-        assertThat(client.executeQuery(""), is(expected));
-        
verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComQueryPacket.class));
+        Optional<ChannelFuture> actual = client.closeChannel();
+        assertTrue(actual.isPresent());
+        verify(channel).close();
+    }
+    
+    @Test
+    void assertMySQLCommandResponseHandlerBranches() throws Exception {
+        AtomicReference<ChannelInitializer<SocketChannel>> initializer = new 
AtomicReference<>();
+        SocketChannel socketChannel = mock(SocketChannel.class, 
RETURNS_DEEP_STUBS);
+        when(socketChannel.pipeline()).thenReturn(pipeline);
+        when(channelFuture.channel()).thenReturn(channel);
+        mockChannelResponse(new MySQLServerVersion("5.5.0-log"));
+        try (MockedConstruction<Bootstrap> ignored = 
mockConstruction(Bootstrap.class, (mock, context) -> {
+            when(mock.group(any())).thenReturn(mock);
+            when(mock.channel(any())).thenReturn(mock);
+            when(mock.option(any(), any())).thenReturn(mock);
+            when(mock.handler(any())).thenAnswer(invocation -> {
+                initializer.set(invocation.getArgument(0));
+                return mock;
+            });
+            when(mock.connect(anyString(), 
anyInt())).thenReturn(channelFuture);
+        })) {
+            client.connect();
+        }
+        ChannelHandlerContext context = mock(ChannelHandlerContext.class);
+        when(context.executor()).thenReturn(eventLoopGroup.next());
+        when(context.channel()).thenReturn(socketChannel);
+        when(context.pipeline()).thenReturn(pipeline);
+        initializer.get().channelRegistered(context);
+        ArgumentCaptor<ChannelHandler> captor = 
ArgumentCaptor.forClass(ChannelHandler.class);
+        verify(pipeline, times(7)).addLast(captor.capture());
+        ChannelInboundHandlerAdapter handler = captor.getAllValues().stream()
+                .filter(each -> 
"MySQLCommandResponseHandler".equals(each.getClass().getSimpleName()))
+                .map(each -> (ChannelInboundHandlerAdapter) 
each).findFirst().orElseThrow(IllegalStateException::new);
+        Promise<Object> callback = new DefaultPromise<>(eventLoopGroup.next());
+        setResponseCallback(callback);
+        handler.channelRead(mock(ChannelHandlerContext.class), new Object());
+        assertTrue(callback.isSuccess());
+        callback = new DefaultPromise<>(eventLoopGroup.next());
+        setResponseCallback(callback);
+        handler.exceptionCaught(mock(ChannelHandlerContext.class), new 
RuntimeException("ex"));
+        assertTrue(callback.isDone());
+        setResponseCallback(null);
+        handler.channelRead(mock(ChannelHandlerContext.class), new Object());
+        handler.exceptionCaught(mock(ChannelHandlerContext.class), new 
RuntimeException("ex"));
+        client.closeChannel();
     }
     
     @Test
-    void assertSubscribeBelow56Version() throws ReflectiveOperationException {
-        MySQLServerVersion serverInfo = new MySQLServerVersion("5.5.0-log");
-        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("serverVersion"),
 client, serverInfo);
+    void assertMySQLBinlogEventHandlerBranches() throws Exception {
+        client = createClientMock();
+        prepareClientChannel();
+        setServerVersion("5.6.0");
+        doReturn(true).when(client).execute(anyString());
+        
doReturn(createResultSet("CRC32")).when(client).executeQuery(anyString());
+        
when(channel.writeAndFlush(any(MySQLComRegisterSlaveCommandPacket.class))).thenAnswer(invocation
 -> {
+            Promise<Object> callback = new 
DefaultPromise<>(eventLoopGroup.next());
+            callback.setSuccess(new MySQLOKPacket(0));
+            setResponseCallback(callback);
+            return null;
+        });
+        doAnswer(invocation -> 
null).when(channel).writeAndFlush(any(MySQLComBinlogDumpCommandPacket.class));
+        client.subscribe("binlog-000003", 12L);
+        ArgumentCaptor<ChannelHandler> captor = 
ArgumentCaptor.forClass(ChannelHandler.class);
+        verify(pipeline, times(2)).addLast(captor.capture());
+        ChannelInboundHandlerAdapter handler = captor.getAllValues().stream()
+                .filter(each -> 
"MySQLBinlogEventHandler".equals(each.getClass().getSimpleName()))
+                .map(each -> (ChannelInboundHandlerAdapter) 
each).findFirst().orElseThrow(IllegalStateException::new);
+        setRunning(false);
+        handler.channelRead(mock(ChannelHandlerContext.class), new Object());
+        setRunning(true);
+        handler.channelRead(mock(ChannelHandlerContext.class), 
Collections.emptyList());
+        PlaceholderBinlogEvent firstEvent = new 
PlaceholderBinlogEvent("binlog-000003", 12L, 1L);
+        PlaceholderBinlogEvent secondEvent = new 
PlaceholderBinlogEvent("binlog-000003", 20L, 2L);
+        handler.channelRead(mock(ChannelHandlerContext.class), 
Arrays.asList(firstEvent, secondEvent));
+        handler.channelRead(mock(ChannelHandlerContext.class), new 
PlaceholderBinlogEvent("binlog-000003", 30L, 3L));
+        handler.exceptionCaught(mock(ChannelHandlerContext.class), new 
RuntimeException("binlog"));
+        setRunning(false);
+        handler.channelInactive(mock(ChannelHandlerContext.class));
+        setRunning(true);
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicInteger connectAttempts = new AtomicInteger();
+        doAnswer(invocation -> {
+            if (connectAttempts.getAndIncrement() == 0) {
+                throw new RuntimeException("fail");
+            }
+            return null;
+        }).when(client).connect();
+        doAnswer(invocation -> {
+            latch.countDown();
+            return null;
+        }).when(client).subscribe(anyString(), anyLong());
+        handler.channelInactive(mock(ChannelHandlerContext.class));
+        handler.channelInactive(mock(ChannelHandlerContext.class));
+        assertTrue(latch.await(3L, TimeUnit.SECONDS));
+        AtomicBoolean reconnectRequested = (AtomicBoolean) 
Plugins.getMemberAccessor().get(handler.getClass().getDeclaredField("reconnectRequested"),
 handler);
+        reconnectRequested.set(true);
+        handler.channelInactive(mock(ChannelHandlerContext.class));
+    }
+    
+    private InternalResultSet createResultSet(final String checksum) {
+        InternalResultSet result = new InternalResultSet(null);
+        result.getFieldValues().add(new 
MySQLTextResultSetRowPacket(Collections.singletonList(checksum)));
+        return result;
+    }
+    
+    private MySQLBinlogClient createClientMock() {
+        return mock(MySQLBinlogClient.class, 
withSettings().useConstructor(connectInfo, 
false).defaultAnswer(CALLS_REAL_METHODS));
+    }
+    
+    @SneakyThrows(ReflectiveOperationException.class)
+    private void prepareClientChannel() {
         
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("channel"),
 client, channel);
-        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("eventLoopGroup"),
 client, new NioEventLoopGroup(1));
-        mockChannelResponse(new MySQLOKPacket(0));
-        client.subscribe("", 4L);
-        
verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComRegisterSlaveCommandPacket.class));
-        
verify(channel).writeAndFlush(ArgumentMatchers.any(MySQLComBinlogDumpCommandPacket.class));
+        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("eventLoopGroup"),
 client, eventLoopGroup);
+    }
+    
+    @SneakyThrows(ReflectiveOperationException.class)
+    private void setServerVersion(final String version) {
+        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("serverVersion"),
 client, new MySQLServerVersion(version));
+    }
+    
+    @SneakyThrows(ReflectiveOperationException.class)
+    private void setResponseCallback(final Promise<Object> promise) {
+        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("responseCallback"),
 client, promise);
+    }
+    
+    @SneakyThrows(ReflectiveOperationException.class)
+    private void setRunning(final boolean value) {
+        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("running"),
 client, value);
+    }
+    
+    @SneakyThrows(ReflectiveOperationException.class)
+    private int getChecksumLength(final MySQLBinlogEventPacketDecoder decoder) 
{
+        MySQLBinlogContext binlogContext = (MySQLBinlogContext) 
Plugins.getMemberAccessor().get(MySQLBinlogEventPacketDecoder.class.getDeclaredField("binlogContext"),
 decoder);
+        return binlogContext.getChecksumLength();
     }
     
     private void mockChannelResponse(final Object response) {
@@ -136,25 +473,18 @@ class MySQLBinlogClientTest {
     @SneakyThrows(InterruptedException.class)
     @SuppressWarnings("unchecked")
     private void mockChannelResponseInThread(final Object response) {
-        long t1 = System.currentTimeMillis();
+        long startMillis = System.currentTimeMillis();
         do {
-            Promise<Object> responseCallback;
+            Promise<Object> callback;
             try {
-                responseCallback = (Promise<Object>) 
Plugins.getMemberAccessor().get(MySQLBinlogClient.class.getDeclaredField("responseCallback"),
 client);
+                callback = (Promise<Object>) 
Plugins.getMemberAccessor().get(MySQLBinlogClient.class.getDeclaredField("responseCallback"),
 client);
             } catch (final ReflectiveOperationException ex) {
                 throw new RuntimeException(ex);
             }
-            if (null != responseCallback && !responseCallback.isDone()) {
-                responseCallback.setSuccess(response);
+            if (null != callback && !callback.isDone()) {
+                callback.setSuccess(response);
             }
-            TimeUnit.SECONDS.sleep(1L);
-        } while (System.currentTimeMillis() - t1 <= 
TimeUnit.SECONDS.toMillis(20L));
-    }
-    
-    @Test
-    void assertPollOnNotRunning() throws ReflectiveOperationException {
-        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("channel"),
 client, channel);
-        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("running"),
 client, false);
-        assertThat(client.poll(), is(Collections.emptyList()));
+            TimeUnit.MILLISECONDS.sleep(100L);
+        } while (System.currentTimeMillis() - startMillis <= 
TimeUnit.SECONDS.toMillis(20L));
     }
 }

Reply via email to