This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d1b6d3ec92e Add more test cases on MySQLIncrementalDumperTest (#37433)
d1b6d3ec92e is described below

commit d1b6d3ec92e8b683e3395a3a1687707cdf5caa02
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 19 00:30:08 2025 +0800

    Add more test cases on MySQLIncrementalDumperTest (#37433)
    
    * Add more test cases of MppdbDecodingPluginTest
    
    * Add more test cases of MppdbDecodingPluginTest
    
    * Remove unreached codes on MySQLIncrementalDumper
    
    * Remove unreached codes on MySQLIncrementalDumper
    
    * Add more test cases on MySQLIncrementalDumperTest
---
 .../dumper/MySQLIncrementalDumperTest.java         | 126 ++++++++++-----------
 1 file changed, 58 insertions(+), 68 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index df899c688ab..19968fe9874 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.dumper;
 
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.channel.memory.MemoryPipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
@@ -27,56 +27,64 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.In
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.position.MySQLBinlogPosition;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.MySQLBinlogClient;
 import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import org.apache.shardingsphere.test.infra.fixture.jdbc.MockedDriver;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.internal.configuration.plugins.Plugins;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 import java.io.Serializable;
-import java.lang.reflect.Method;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.isA;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.LENIENT)
-@SuppressWarnings("unchecked")
 class MySQLIncrementalDumperTest {
     
-    private MySQLIncrementalDumper incrementalDumper;
+    private IncrementalDumperContext dumperContext;
     
-    private PipelineTableMetaData pipelineTableMetaData;
+    private PipelineTableMetaDataLoader metaDataLoader;
     
     @BeforeAll
     static void init() throws ClassNotFoundException {
@@ -85,15 +93,10 @@ class MySQLIncrementalDumperTest {
     
     @BeforeEach
     void setUp() throws SQLException {
-        IncrementalDumperContext dumperContext = createDumperContext();
+        dumperContext = createDumperContext();
         initTableData(dumperContext);
-        PipelineTableMetaDataLoader metaDataLoader = 
mock(PipelineTableMetaDataLoader.class);
-        MemoryPipelineChannel channel = new MemoryPipelineChannel(10000, 
records -> {
-            
-        });
-        incrementalDumper = new MySQLIncrementalDumper(dumperContext, new 
MySQLBinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
-        pipelineTableMetaData = new PipelineTableMetaData("t_order", 
mockOrderColumnsMetaDataMap(), Collections.emptyList());
-        when(metaDataLoader.getTableMetaData(any(), 
any())).thenReturn(pipelineTableMetaData);
+        metaDataLoader = mock(PipelineTableMetaDataLoader.class);
+        when(metaDataLoader.getTableMetaData(any(), any())).thenReturn(new 
PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(), 
Collections.emptyList()));
     }
     
     private IncrementalDumperContext createDumperContext() {
@@ -132,61 +135,48 @@ class MySQLIncrementalDumperTest {
         return result;
     }
     
+    @SuppressWarnings("unchecked")
     @Test
-    void assertWriteRowsEvent() throws ReflectiveOperationException {
-        List<Record> actual = getRecordsByWriteRowsEvent(new 
MySQLWriteRowsBinlogEvent("", 0, 0L, "", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"})));
-        assertThat(actual.size(), is(1));
-        assertThat(actual.get(0), isA(DataRecord.class));
-        assertThat(((DataRecord) actual.get(0)).getType(), 
is(PipelineSQLOperationType.INSERT));
-        assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
-    }
-    
-    private List<Record> getRecordsByWriteRowsEvent(final 
MySQLWriteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
-        Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", 
MySQLWriteRowsBinlogEvent.class, PipelineTableMetaData.class);
-        return (List<Record>) Plugins.getMemberAccessor().invoke(method, 
incrementalDumper, rowsEvent, pipelineTableMetaData);
-    }
-    
-    @Test
-    void assertUpdateRowsEvent() throws ReflectiveOperationException {
-        List<Record> actual = getRecordsByUpdateRowsEvent(new 
MySQLUpdateRowsBinlogEvent(
-                "", 0, 0L, "test", "t_order", Collections.singletonList(new 
Serializable[]{101, 1, "OK"}), Collections.singletonList(new 
Serializable[]{101, 1, "OK2"})));
-        assertThat(actual.size(), is(1));
-        assertThat(actual.get(0), isA(DataRecord.class));
-        assertThat(((DataRecord) actual.get(0)).getType(), 
is(PipelineSQLOperationType.UPDATE));
-        assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
-    }
-    
-    private List<Record> getRecordsByUpdateRowsEvent(final 
MySQLUpdateRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
-        Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", 
MySQLUpdateRowsBinlogEvent.class, PipelineTableMetaData.class);
-        return (List<Record>) Plugins.getMemberAccessor().invoke(method, 
incrementalDumper, rowsEvent, pipelineTableMetaData);
-    }
-    
-    @Test
-    void assertDeleteRowsEvent() throws ReflectiveOperationException {
-        List<Record> actual = getRecordsByDeleteRowsEvent(new 
MySQLDeleteRowsBinlogEvent("", 0, 0L, "", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"})));
-        assertThat(actual.size(), is(1));
-        assertThat(actual.get(0), isA(DataRecord.class));
-        assertThat(((DataRecord) actual.get(0)).getType(), 
is(PipelineSQLOperationType.DELETE));
-        assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
-    }
-    
-    private List<Record> getRecordsByDeleteRowsEvent(final 
MySQLDeleteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
-        Method method = 
MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", 
MySQLDeleteRowsBinlogEvent.class, PipelineTableMetaData.class);
-        return (List<Record>) Plugins.getMemberAccessor().invoke(method, 
incrementalDumper, rowsEvent, pipelineTableMetaData);
-    }
-    
-    @Test
-    void assertPlaceholderEvent() throws ReflectiveOperationException {
-        List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 MySQLBaseBinlogEvent.class),
-                incrementalDumper, new PlaceholderBinlogEvent("", 0, 0L));
-        assertThat(actual.size(), is(1));
-    }
-    
-    @Test
-    void assertRowsEventFiltered() throws ReflectiveOperationException {
-        List<Record> actual = (List<Record>) 
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
 MySQLBaseBinlogEvent.class),
-                incrementalDumper, new MySQLWriteRowsBinlogEvent("", 0, 0L, 
"test", "t_order", Collections.singletonList(new Serializable[]{1})));
-        assertThat(actual.size(), is(1));
-        assertThat(actual.get(0), isA(DataRecord.class));
+    void assertRunBlockingCoversAllBranches() throws 
ReflectiveOperationException, InterruptedException {
+        MySQLBaseRowsBinlogEvent unsupportedEvent = 
mock(MySQLBaseRowsBinlogEvent.class);
+        when(unsupportedEvent.getDatabaseName()).thenReturn("test");
+        when(unsupportedEvent.getTableName()).thenReturn("t_order");
+        MySQLWriteRowsBinlogEvent filteredEvent = new 
MySQLWriteRowsBinlogEvent("binlog-000001", 13L, 2L, "other_db", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+        MySQLWriteRowsBinlogEvent writeEvent = new 
MySQLWriteRowsBinlogEvent("binlog-000001", 4L, 5L, "test", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+        MySQLUpdateRowsBinlogEvent updateEvent = new 
MySQLUpdateRowsBinlogEvent("binlog-000001", 5L, 6L, "test", "t_order",
+                Collections.singletonList(new Serializable[]{101, 1, "OK"}), 
Collections.singletonList(new Serializable[]{101, 1, "UPDATED"}));
+        MySQLDeleteRowsBinlogEvent deleteEvent = new 
MySQLDeleteRowsBinlogEvent("binlog-000001", 6L, 7L, "test", "t_order", 
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+        List<MySQLBaseBinlogEvent> firstPollEvents = Arrays.asList(new 
PlaceholderBinlogEvent("binlog-000001", 3L, 1L), writeEvent, updateEvent, 
deleteEvent, filteredEvent, unsupportedEvent);
+        PipelineChannel channel = mock(PipelineChannel.class);
+        MySQLIncrementalDumper dumper = new 
MySQLIncrementalDumper(dumperContext, new MySQLBinlogPosition("binlog-000001", 
4L), channel, metaDataLoader);
+        MySQLBinlogClient client = mock(MySQLBinlogClient.class);
+        AtomicInteger counter = new AtomicInteger();
+        when(client.poll()).thenAnswer(invocation -> {
+            if (0 == counter.getAndIncrement()) {
+                return firstPollEvents;
+            }
+            dumper.stop();
+            return Collections.singletonList(unsupportedEvent);
+        });
+        
Plugins.getMemberAccessor().set(MySQLIncrementalDumper.class.getDeclaredField("client"),
 dumper, client);
+        Thread dumperThread = new Thread(dumper::start);
+        dumperThread.start();
+        dumperThread.join(1000L);
+        verify(client).connect();
+        verify(client).subscribe("binlog-000001", 4L);
+        verify(client, timeout(1000L)).closeChannel();
+        ArgumentCaptor<List<Record>> captor = 
ArgumentCaptor.forClass(List.class);
+        verify(channel, timeout(1000L)).push(captor.capture());
+        List<Record> pushed = captor.getValue();
+        assertThat(pushed.size(), is(5));
+        assertThat(pushed.get(0), isA(PlaceholderRecord.class));
+        assertThat(pushed.get(0).getCommitTime(), is(1000L));
+        assertThat(((DataRecord) pushed.get(1)).getType(), 
is(PipelineSQLOperationType.INSERT));
+        DataRecord updatedRecord = (DataRecord) pushed.get(2);
+        assertThat(updatedRecord.getType(), 
is(PipelineSQLOperationType.UPDATE));
+        assertTrue(updatedRecord.getColumn(2).isUpdated());
+        assertThat(((DataRecord) pushed.get(3)).getType(), 
is(PipelineSQLOperationType.DELETE));
+        assertThat(pushed.get(4), isA(PlaceholderRecord.class));
+        assertFalse(dumperThread.isAlive());
     }
 }

Reply via email to