This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a98b44469f8 Add more test cases of MppdbDecodingPluginTest (#37432)
a98b44469f8 is described below

commit a98b44469f82ca63337dcd6ec4b76808055adfae
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Dec 18 23:32:23 2025 +0800

    Add more test cases of MppdbDecodingPluginTest (#37432)
    
    * Add more test cases of MppdbDecodingPluginTest
    
    * Add more test cases of MppdbDecodingPluginTest
    
    * Remove unreached codes on MySQLIncrementalDumper
    
    * Remove unreached codes on MySQLIncrementalDumper
---
 .../incremental/dumper/MySQLIncrementalDumper.java |   4 +-
 .../wal/decode/MppdbDecodingPluginTest.java        | 115 +++++++++++++++++++++
 2 files changed, 116 insertions(+), 3 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
index f69dedb9f98..525de8fc997 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
@@ -200,8 +200,6 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
     
     @Override
     protected void doStop() {
-        if (null != client) {
-            client.closeChannel();
-        }
+        client.closeChannel();
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/incremental/wal/decode/MppdbDecodingPluginTest.java
 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/incremental/wal/decode/MppdbDecodingPluginTest.java
index 3a343cdd362..f0ad76e83a0 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/incremental/wal/decode/MppdbDecodingPluginTest.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/incremental/wal/decode/MppdbDecodingPluginTest.java
@@ -31,7 +31,11 @@ import org.junit.jupiter.api.Test;
 import org.opengauss.jdbc.TimestampUtils;
 import org.opengauss.replication.LogSequenceNumber;
 import org.opengauss.util.PGobject;
+import org.mockito.MockedConstruction;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
 import java.sql.Time;
@@ -39,6 +43,7 @@ import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.stream.Stream;
 import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -46,7 +51,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.isA;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
 import static org.mockito.Mockito.when;
 
 class MppdbDecodingPluginTest {
@@ -329,4 +337,111 @@ class MppdbDecodingPluginTest {
         Object byteaObj = actual.getAfterRow().get(0);
         assertThat(byteaObj, is(255));
     }
+    
+    @Test
+    void assertDecodeWriteRowEventWithVariousTypes() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        String[] columnTypes = new String[]{"numeric(10,2)", "bit(3)", "real", 
"double precision", "money", "bytea", "blob", "interval", "character", "text", 
"tstzrange"};
+        tableData.setColumnsName(IntStream.range(0, 
columnTypes.length).mapToObj(idx -> "data" + idx).toArray(String[]::new));
+        tableData.setColumnsType(columnTypes);
+        String[] columnValues = new String[]{"10.1", "b101", "1.5", "2.5", 
"'1.08'", "'\\x'", "'\\x01'", "'abc year'", "a", "null", "'[\"2020-01-01 
00:00:00+00\",\"2021-01-01 00:00:00+00\"]'"};
+        tableData.setColumnsVal(columnValues);
+        WriteRowEvent actual = (WriteRowEvent) new MppdbDecodingPlugin(null, 
false, 
false).decode(ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes()), 
logSequenceNumber);
+        assertThat(actual.getAfterRow().get(0), is(new BigDecimal("10.1")));
+        assertThat(actual.getAfterRow().get(1), is("101"));
+        assertThat(actual.getAfterRow().get(2), is(1.5F));
+        assertThat(actual.getAfterRow().get(3), is(2.5D));
+        assertThat(actual.getAfterRow().get(4), is("1.08"));
+        assertThat(((byte[]) actual.getAfterRow().get(5)).length, is(0));
+        assertThat(((byte[]) actual.getAfterRow().get(6))[0], is((byte) 1));
+        assertNull(actual.getAfterRow().get(7));
+        assertThat(actual.getAfterRow().get(8), is("a"));
+        assertNull(actual.getAfterRow().get(9));
+        assertThat(actual.getAfterRow().get(10).toString(), is("[\"2020-01-01 
00:00:00+00\",\"2021-01-01 00:00:00+00\"]"));
+    }
+    
+    @Test
+    void assertDecodeSeriallyPlaceholderEvent() {
+        ByteBuffer data = ByteBuffer.wrap("RANDOM".getBytes());
+        AbstractWALEvent actual = new MppdbDecodingPlugin(null, true, 
false).decode(data, logSequenceNumber);
+        assertThat(actual, isA(PlaceholderEvent.class));
+    }
+    
+    @Test
+    void assertDecodeParallelBeginWithoutFirstLsn() {
+        MppdbDecodingPlugin mppdbDecodingPlugin = new 
MppdbDecodingPlugin(null, true, true);
+        AbstractWALEvent beginEvent = 
mppdbDecodingPlugin.decode(ByteBuffer.wrap("BEGIN CSN: 10".getBytes()), 
logSequenceNumber);
+        assertThat(beginEvent, isA(BeginTXEvent.class));
+        assertThat(((BeginTXEvent) beginEvent).getCsn(), is(0L));
+        AbstractWALEvent placeholderEvent = 
mppdbDecodingPlugin.decode(ByteBuffer.wrap("OTHER".getBytes()), 
logSequenceNumber);
+        assertThat(placeholderEvent, isA(PlaceholderEvent.class));
+    }
+    
+    @Test
+    void assertDecodeParallelCommitUppercase() {
+        AbstractWALEvent actual = new MppdbDecodingPlugin(null, true, 
true).decode(ByteBuffer.wrap("COMMIT xid: 20".getBytes()), logSequenceNumber);
+        assertThat(actual, isA(CommitTXEvent.class));
+        assertThat(((CommitTXEvent) actual).getXid(), is(20L));
+        assertNull(((CommitTXEvent) actual).getCsn());
+    }
+    
+    @Test
+    void assertDecodeSelectRowEventType() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("SELECT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"character varying"});
+        tableData.setColumnsVal(new String[]{"'1'"});
+        ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
+        assertThrows(IngestException.class, () -> new 
MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber));
+    }
+    
+    @Test
+    void assertDecodeTimestampThrowsDecodingException() throws SQLException {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"timestamp without time zone"});
+        tableData.setColumnsVal(new String[]{"'2020-01-01'"});
+        TimestampUtils timestampUtils = mock(TimestampUtils.class);
+        when(timestampUtils.toTimestamp(null, "2020-01-01")).thenThrow(new 
SQLException(""));
+        ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
+        assertThrows(DecodingException.class, () -> new 
MppdbDecodingPlugin(new OpenGaussTimestampUtils(timestampUtils), false, 
false).decode(data, logSequenceNumber));
+    }
+    
+    @Test
+    void assertDecodePgObjectWhenSetValueThrowsSQLException() {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"raw"});
+        tableData.setColumnsVal(new String[]{"'7D'"});
+        ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
+        try (MockedConstruction<PGobject> ignored = 
mockConstruction(PGobject.class, (mocked, mockContext) -> doThrow(new 
SQLException()).when(mocked).setValue(anyString()))) {
+            WriteRowEvent actual = (WriteRowEvent) new 
MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber);
+            assertNull(actual.getAfterRow().get(0));
+        }
+    }
+    
+    @ParameterizedTest
+    @MethodSource("invalidHexValueProvider")
+    void assertDecodeByteaWithInvalidHexValue(final String hexValue) {
+        MppTableData tableData = new MppTableData();
+        tableData.setTableName("public.test");
+        tableData.setOpType("INSERT");
+        tableData.setColumnsName(new String[]{"data"});
+        tableData.setColumnsType(new String[]{"bytea"});
+        tableData.setColumnsVal(new String[]{hexValue});
+        ByteBuffer data = 
ByteBuffer.wrap(JsonUtils.toJsonString(tableData).getBytes());
+        assertThrows(IllegalArgumentException.class, () -> new 
MppdbDecodingPlugin(null, false, false).decode(data, logSequenceNumber));
+    }
+    
+    private static Stream<String> invalidHexValueProvider() {
+        return Stream.of("'\\xabc'", "'\\x0g'", "'\\xg0'");
+    }
 }

Reply via email to