This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a98b44469f8 Add more test cases of MppdbDecodingPluginTest (#37432)
a98b44469f8 is described below
commit a98b44469f82ca63337dcd6ec4b76808055adfae
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Dec 18 23:32:23 2025 +0800
Add more test cases of MppdbDecodingPluginTest (#37432)
* Add more test cases of MppdbDecodingPluginTest
* Add more test cases of MppdbDecodingPluginTest
* Remove unreached codes on MySQLIncrementalDumper
* Remove unreached codes on MySQLIncrementalDumper
---
.../incremental/dumper/MySQLIncrementalDumper.java | 4 +-
.../wal/decode/MppdbDecodingPluginTest.java | 115 +++++++++++++++++++++
2 files changed, 116 insertions(+), 3 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
index f69dedb9f98..525de8fc997 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
@@ -200,8 +200,6 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
@Override
protected void doStop() {
- if (null != client) {
- client.closeChannel();
- }
+ client.closeChannel();
}
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/incremental/wal/decode/MppdbDecodingPluginTest.java
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/incremental/wal/decode/MppdbDecodingPluginTest.java
index 3a343cdd362..f0ad76e83a0 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/incremental/wal/decode/MppdbDecodingPluginTest.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/incremental/wal/decode/MppdbDecodingPluginTest.java
@@ -31,7 +31,11 @@ import org.junit.jupiter.api.Test;
import org.opengauss.jdbc.TimestampUtils;
import org.opengauss.replication.LogSequenceNumber;
import org.opengauss.util.PGobject;
+import org.mockito.MockedConstruction;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.sql.Time;
@@ -39,6 +43,7 @@ import java.sql.Timestamp;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
+import java.util.stream.Stream;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.is;
@@ -46,7 +51,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.isA;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.when;
class MppdbDecodingPluginTest {
@@ -329,4 +337,111 @@ class MppdbDecodingPluginTest {
Object byteaObj = actual.getAfterRow().get(0);
assertThat(byteaObj, is(255));
}
+
+ @Test
+ void assertDecodeWriteRowEventWithVariousTypes() {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("INSERT");
+ String[] columnTypes = new String[]{"numeric(10,2)", "bit(3)", "real",
"double precision", "money", "bytea", "blob", "interval", "character", "text",
"tstzrange"};
+ tableData.setColumnsName(IntStream.range(0,
columnTypes.length).mapToObj(idx -> "data" + idx).toArray(String[]::new));
+ tableData.setColumnsType(columnTypes);
+ String[] columnValues = new String[]{"10.1", "b101", "1.5", "2.5",
"'1.08'", "'\\x'", "'\\x01'", "'abc year'", "a", "null", "'[\"2020-01-01
00:00:00+00\",\"2021-01-01 00:00:00+00\"]'"};
+ tableData.setColumnsVal(columnValues);
+ WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null,
false,
false).decode(ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()),
logSequenceNumber);
+ assertThat(actual.getAfterRow().get(0), is(new BigDecimal("10.1")));
+ assertThat(actual.getAfterRow().get(1), is("101"));
+ assertThat(actual.getAfterRow().get(2), is(1.5F));
+ assertThat(actual.getAfterRow().get(3), is(2.5D));
+ assertThat(actual.getAfterRow().get(4), is("1.08"));
+ assertThat(((byte[]) actual.getAfterRow().get(5)).length, is(0));
+ assertThat(((byte[]) actual.getAfterRow().get(6))[0], is((byte) 1));
+ assertNull(actual.getAfterRow().get(7));
+ assertThat(actual.getAfterRow().get(8), is("a"));
+ assertNull(actual.getAfterRow().get(9));
+ assertThat(actual.getAfterRow().get(10).toString(), is("[\"2020-01-01
00:00:00+00\",\"2021-01-01 00:00:00+00\"]"));
+ }
+
+ @Test
+ void assertDecodeSeriallyPlaceholderEvent() {
+ ByteBuffer data = ByteBuffer.wrap("RANDOM".getBytes());
+ AbstractWALEvent actual = new MppdbDecodingPlugin(null, true,
false).decode(data, logSequenceNumber);
+ assertThat(actual, isA(PlaceholderEvent.class));
+ }
+
+ @Test
+ void assertDecodeParallelBeginWithoutFirstLsn() {
+ MppdbDecodingPlugin mppdbDecodingPlugin = new
MppdbDecodingPlugin(null, true, true);
+ AbstractWALEvent beginEvent =
mppdbDecodingPlugin.decode(ByteBuffer.wrap("BEGIN CSN: 10".getBytes()),
logSequenceNumber);
+ assertThat(beginEvent, isA(BeginTXEvent.class));
+ assertThat(((BeginTXEvent) beginEvent).getCsn(), is(0L));
+ AbstractWALEvent placeholderEvent =
mppdbDecodingPlugin.decode(ByteBuffer.wrap("OTHER".getBytes()),
logSequenceNumber);
+ assertThat(placeholderEvent, isA(PlaceholderEvent.class));
+ }
+
+ @Test
+ void assertDecodeParallelCommitUppercase() {
+ AbstractWALEvent actual = new MppdbDecodingPlugin(null, true,
true).decode(ByteBuffer.wrap("COMMIT xid: 20".getBytes()), logSequenceNumber);
+ assertThat(actual, isA(CommitTXEvent.class));
+ assertThat(((CommitTXEvent) actual).getXid(), is(20L));
+ assertNull(((CommitTXEvent) actual).getCsn());
+ }
+
+ @Test
+ void assertDecodeSelectRowEventType() {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("SELECT");
+ tableData.setColumnsName(new String[]{"data"});
+ tableData.setColumnsType(new String[]{"character varying"});
+ tableData.setColumnsVal(new String[]{"'1'"});
+ ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
+ assertThrows(IngestException.class, () -> new
MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber));
+ }
+
+ @Test
+ void assertDecodeTimestampThrowsDecodingException() throws SQLException {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("INSERT");
+ tableData.setColumnsName(new String[]{"data"});
+ tableData.setColumnsType(new String[]{"timestamp without time zone"});
+ tableData.setColumnsVal(new String[]{"'2020-01-01'"});
+ TimestampUtils timestampUtils = mock(TimestampUtils.class);
+ when(timestampUtils.toTimestamp(null, "2020-01-01")).thenThrow(new
SQLException(""));
+ ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
+ assertThrows(DecodingException.class, () -> new
MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils), false,
false).decode(data, logSequenceNumber));
+ }
+
+ @Test
+ void assertDecodePgObjectWhenSetValueThrowsSQLException() {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("INSERT");
+ tableData.setColumnsName(new String[]{"data"});
+ tableData.setColumnsType(new String[]{"raw"});
+ tableData.setColumnsVal(new String[]{"'7D'"});
+ ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
+ try (MockedConstruction<PGobject> ignored =
mockConstruction(PGobject.class, (mocked, mockContext) -> doThrow(new
SQLException()).when(mocked).setValue(anyString()))) {
+ WriteRowEvent actual = (WriteRowEvent) new
MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber);
+ assertNull(actual.getAfterRow().get(0));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("invalidHexValueProvider")
+ void assertDecodeByteaWithInvalidHexValue(final String hexValue) {
+ MppTableData tableData = new MppTableData();
+ tableData.setTableName("public.test");
+ tableData.setOpType("INSERT");
+ tableData.setColumnsName(new String[]{"data"});
+ tableData.setColumnsType(new String[]{"bytea"});
+ tableData.setColumnsVal(new String[]{hexValue});
+ ByteBuffer data =
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
+ assertThrows(IllegalArgumentException.class, () -> new
MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber));
+ }
+
+ private static Stream<String> invalidHexValueProvider() {
+ return Stream.of("'\\xabc'", "'\\x0g'", "'\\xg0'");
+ }
}