This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d1b6d3ec92e Add more test cases on MySQLIncrementalDumperTest (#37433)
d1b6d3ec92e is described below
commit d1b6d3ec92e8b683e3395a3a1687707cdf5caa02
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 19 00:30:08 2025 +0800
Add more test cases on MySQLIncrementalDumperTest (#37433)
* Add more test cases of MppdbDecodingPluginTest
* Add more test cases of MppdbDecodingPluginTest
* Remove unreached codes on MySQLIncrementalDumper
* Remove unreached codes on MySQLIncrementalDumper
* Add more test cases on MySQLIncrementalDumperTest
---
.../dumper/MySQLIncrementalDumperTest.java | 126 ++++++++++-----------
1 file changed, 58 insertions(+), 68 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index df899c688ab..19968fe9874 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.dumper;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.channel.memory.MemoryPipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
@@ -27,56 +27,64 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.In
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.position.MySQLBinlogPosition;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.MySQLBinlogClient;
import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import org.apache.shardingsphere.test.infra.fixture.jdbc.MockedDriver;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.internal.configuration.plugins.Plugins;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import java.io.Serializable;
-import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.isA;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
-@SuppressWarnings("unchecked")
class MySQLIncrementalDumperTest {
- private MySQLIncrementalDumper incrementalDumper;
+ private IncrementalDumperContext dumperContext;
- private PipelineTableMetaData pipelineTableMetaData;
+ private PipelineTableMetaDataLoader metaDataLoader;
@BeforeAll
static void init() throws ClassNotFoundException {
@@ -85,15 +93,10 @@ class MySQLIncrementalDumperTest {
@BeforeEach
void setUp() throws SQLException {
- IncrementalDumperContext dumperContext = createDumperContext();
+ dumperContext = createDumperContext();
initTableData(dumperContext);
- PipelineTableMetaDataLoader metaDataLoader =
mock(PipelineTableMetaDataLoader.class);
- MemoryPipelineChannel channel = new MemoryPipelineChannel(10000,
records -> {
-
- });
- incrementalDumper = new MySQLIncrementalDumper(dumperContext, new
MySQLBinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
- pipelineTableMetaData = new PipelineTableMetaData("t_order",
mockOrderColumnsMetaDataMap(), Collections.emptyList());
- when(metaDataLoader.getTableMetaData(any(),
any())).thenReturn(pipelineTableMetaData);
+ metaDataLoader = mock(PipelineTableMetaDataLoader.class);
+ when(metaDataLoader.getTableMetaData(any(), any())).thenReturn(new
PipelineTableMetaData("t_order", mockOrderColumnsMetaDataMap(),
Collections.emptyList()));
}
private IncrementalDumperContext createDumperContext() {
@@ -132,61 +135,48 @@ class MySQLIncrementalDumperTest {
return result;
}
+ @SuppressWarnings("unchecked")
@Test
- void assertWriteRowsEvent() throws ReflectiveOperationException {
- List<Record> actual = getRecordsByWriteRowsEvent(new
MySQLWriteRowsBinlogEvent("", 0, 0L, "", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"})));
- assertThat(actual.size(), is(1));
- assertThat(actual.get(0), isA(DataRecord.class));
- assertThat(((DataRecord) actual.get(0)).getType(),
is(PipelineSQLOperationType.INSERT));
- assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
- }
-
- private List<Record> getRecordsByWriteRowsEvent(final
MySQLWriteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
- Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent",
MySQLWriteRowsBinlogEvent.class, PipelineTableMetaData.class);
- return (List<Record>) Plugins.getMemberAccessor().invoke(method,
incrementalDumper, rowsEvent, pipelineTableMetaData);
- }
-
- @Test
- void assertUpdateRowsEvent() throws ReflectiveOperationException {
- List<Record> actual = getRecordsByUpdateRowsEvent(new
MySQLUpdateRowsBinlogEvent(
- "", 0, 0L, "test", "t_order", Collections.singletonList(new
Serializable[]{101, 1, "OK"}), Collections.singletonList(new
Serializable[]{101, 1, "OK2"})));
- assertThat(actual.size(), is(1));
- assertThat(actual.get(0), isA(DataRecord.class));
- assertThat(((DataRecord) actual.get(0)).getType(),
is(PipelineSQLOperationType.UPDATE));
- assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
- }
-
- private List<Record> getRecordsByUpdateRowsEvent(final
MySQLUpdateRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
- Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent",
MySQLUpdateRowsBinlogEvent.class, PipelineTableMetaData.class);
- return (List<Record>) Plugins.getMemberAccessor().invoke(method,
incrementalDumper, rowsEvent, pipelineTableMetaData);
- }
-
- @Test
- void assertDeleteRowsEvent() throws ReflectiveOperationException {
- List<Record> actual = getRecordsByDeleteRowsEvent(new
MySQLDeleteRowsBinlogEvent("", 0, 0L, "", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"})));
- assertThat(actual.size(), is(1));
- assertThat(actual.get(0), isA(DataRecord.class));
- assertThat(((DataRecord) actual.get(0)).getType(),
is(PipelineSQLOperationType.DELETE));
- assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
- }
-
- private List<Record> getRecordsByDeleteRowsEvent(final
MySQLDeleteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException {
- Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent",
MySQLDeleteRowsBinlogEvent.class, PipelineTableMetaData.class);
- return (List<Record>) Plugins.getMemberAccessor().invoke(method,
incrementalDumper, rowsEvent, pipelineTableMetaData);
- }
-
- @Test
- void assertPlaceholderEvent() throws ReflectiveOperationException {
- List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
MySQLBaseBinlogEvent.class),
- incrementalDumper, new PlaceholderBinlogEvent("", 0, 0L));
- assertThat(actual.size(), is(1));
- }
-
- @Test
- void assertRowsEventFiltered() throws ReflectiveOperationException {
- List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
MySQLBaseBinlogEvent.class),
- incrementalDumper, new MySQLWriteRowsBinlogEvent("", 0, 0L,
"test", "t_order", Collections.singletonList(new Serializable[]{1})));
- assertThat(actual.size(), is(1));
- assertThat(actual.get(0), isA(DataRecord.class));
+ void assertRunBlockingCoversAllBranches() throws
ReflectiveOperationException, InterruptedException {
+ MySQLBaseRowsBinlogEvent unsupportedEvent =
mock(MySQLBaseRowsBinlogEvent.class);
+ when(unsupportedEvent.getDatabaseName()).thenReturn("test");
+ when(unsupportedEvent.getTableName()).thenReturn("t_order");
+ MySQLWriteRowsBinlogEvent filteredEvent = new
MySQLWriteRowsBinlogEvent("binlog-000001", 13L, 2L, "other_db", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+ MySQLWriteRowsBinlogEvent writeEvent = new
MySQLWriteRowsBinlogEvent("binlog-000001", 4L, 5L, "test", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+ MySQLUpdateRowsBinlogEvent updateEvent = new
MySQLUpdateRowsBinlogEvent("binlog-000001", 5L, 6L, "test", "t_order",
+ Collections.singletonList(new Serializable[]{101, 1, "OK"}),
Collections.singletonList(new Serializable[]{101, 1, "UPDATED"}));
+ MySQLDeleteRowsBinlogEvent deleteEvent = new
MySQLDeleteRowsBinlogEvent("binlog-000001", 6L, 7L, "test", "t_order",
Collections.singletonList(new Serializable[]{101, 1, "OK"}));
+ List<MySQLBaseBinlogEvent> firstPollEvents = Arrays.asList(new
PlaceholderBinlogEvent("binlog-000001", 3L, 1L), writeEvent, updateEvent,
deleteEvent, filteredEvent, unsupportedEvent);
+ PipelineChannel channel = mock(PipelineChannel.class);
+ MySQLIncrementalDumper dumper = new
MySQLIncrementalDumper(dumperContext, new MySQLBinlogPosition("binlog-000001",
4L), channel, metaDataLoader);
+ MySQLBinlogClient client = mock(MySQLBinlogClient.class);
+ AtomicInteger counter = new AtomicInteger();
+ when(client.poll()).thenAnswer(invocation -> {
+ if (0 == counter.getAndIncrement()) {
+ return firstPollEvents;
+ }
+ dumper.stop();
+ return Collections.singletonList(unsupportedEvent);
+ });
+
Plugins.getMemberAccessor().set(MySQLIncrementalDumper.class.getDeclaredField("client"),
dumper, client);
+ Thread dumperThread = new Thread(dumper::start);
+ dumperThread.start();
+ dumperThread.join(1000L);
+ verify(client).connect();
+ verify(client).subscribe("binlog-000001", 4L);
+ verify(client, timeout(1000L)).closeChannel();
+ ArgumentCaptor<List<Record>> captor =
ArgumentCaptor.forClass(List.class);
+ verify(channel, timeout(1000L)).push(captor.capture());
+ List<Record> pushed = captor.getValue();
+ assertThat(pushed.size(), is(5));
+ assertThat(pushed.get(0), isA(PlaceholderRecord.class));
+ assertThat(pushed.get(0).getCommitTime(), is(1000L));
+ assertThat(((DataRecord) pushed.get(1)).getType(),
is(PipelineSQLOperationType.INSERT));
+ DataRecord updatedRecord = (DataRecord) pushed.get(2);
+ assertThat(updatedRecord.getType(),
is(PipelineSQLOperationType.UPDATE));
+ assertTrue(updatedRecord.getColumn(2).isUpdated());
+ assertThat(((DataRecord) pushed.get(3)).getType(),
is(PipelineSQLOperationType.DELETE));
+ assertThat(pushed.get(4), isA(PlaceholderRecord.class));
+ assertFalse(dumperThread.isAlive());
}
}