This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 759de54d5f3 Add more test cases on TestDecodingPluginTest (#37453)
759de54d5f3 is described below
commit 759de54d5f364f02c6624ef5bb4dd673e7b4ac02
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 21 19:39:55 2025 +0800
Add more test cases on TestDecodingPluginTest (#37453)
* Add TestDecodingPluginTest
* Add more test cases on TestDecodingPluginTest
* Add more test cases on TestDecodingPluginTest
---
.../wal/decode/TestDecodingPluginTest.java | 135 +++++++++++++++++++--
1 file changed, 127 insertions(+), 8 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPluginTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPluginTest.java
index ec3dbf692f1..56c072f7745 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPluginTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/decode/TestDecodingPluginTest.java
@@ -26,12 +26,22 @@ import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.UpdateRowEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.WriteRowEvent;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.postgresql.jdbc.TimestampUtils;
import org.postgresql.replication.LogSequenceNumber;
+import java.math.BigDecimal;
+import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.sql.Date;
import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -78,8 +88,8 @@ class TestDecodingPluginTest {
@Test
void assertDecodeUpdateRowEvent() {
- ByteBuffer data = ByteBuffer.wrap("table public.test: UPDATE:
unicode[character varying]:' 1 2 3'' ðŸ˜Šä¸ ' t_json_empty[json]:'{}'
t_json[json]:'{\"test\":\"ä¸ä¸{ä¸ä¸}' ä¸\"}'"
- .getBytes(StandardCharsets.UTF_8));
+ ByteBuffer data = ByteBuffer.wrap(
+ "table public.test: UPDATE: unicode[character varying]:' 1 2
3'' ðŸ˜Šä¸ ' t_json_empty[json]:'{}' t_json[json]:'{\"test\":\"ä¸ä¸{ä¸ä¸}'
ä¸\"}'".getBytes(StandardCharsets.UTF_8));
UpdateRowEvent actual = (UpdateRowEvent) new
TestDecodingPlugin(null).decode(data, logSequenceNumber);
assertThat(actual.getLogSequenceNumber(), is(logSequenceNumber));
assertThat(actual.getTableName(), is("test"));
@@ -112,12 +122,6 @@ class TestDecodingPluginTest {
assertThat(new TestDecodingPlugin(null).decode(data,
logSequenceNumber), isA(PlaceholderEvent.class));
}
- @Test
- void assertDecodeUnknownRowEventType() {
- ByteBuffer data = ByteBuffer.wrap("table public.test: UNKNOWN:
data[character varying]:'1 2 3'''".getBytes(StandardCharsets.UTF_8));
- assertThrows(IngestException.class, () -> new
TestDecodingPlugin(null).decode(data, logSequenceNumber));
- }
-
@Test
void assertDecodeTime() throws SQLException {
TimestampUtils timestampUtils = mock(TimestampUtils.class);
@@ -145,4 +149,119 @@ class TestDecodingPluginTest {
AbstractWALEvent actual = new TestDecodingPlugin(null).decode(data,
logSequenceNumber);
assertThat(actual, isA(WriteRowEvent.class));
}
+
+ @Test
+ void assertDecodeColumnWithoutBracketThrowsBufferUnderflow() {
+ ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT:
column_without_bracket".getBytes(StandardCharsets.UTF_8));
+ assertThrows(BufferUnderflowException.class, () -> new
TestDecodingPlugin(null).decode(data, logSequenceNumber));
+ }
+
+ @Test
+ void assertDecodeStringFollowedBySpaceStopsAtSpace() {
+ ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT:
col_text[character varying]:'abc'
col_int[integer]:1".getBytes(StandardCharsets.UTF_8));
+ WriteRowEvent actual = (WriteRowEvent) new
TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual.getAfterRow().get(0), is("abc"));
+ assertThat(actual.getAfterRow().get(1), is(1));
+ }
+
+ @Test
+ void assertDecodeVariousColumnTypes() {
+ BaseTimestampUtils timestampUtils = new BaseTimestampUtils() {
+
+ @Override
+ public Time toTime(final Calendar cal, final String input) throws
SQLException {
+ return Time.valueOf(input);
+ }
+
+ @Override
+ public Timestamp toTimestamp(final Calendar cal, final String
input) throws SQLException {
+ return Timestamp.valueOf(input);
+ }
+ };
+ String wal = "table public.test: INSERT: foo_numeric[numeric]:123.45
foo_bit[bit]:101 foo_smallint[smallint]:1 foo_bigint[bigint]:1234567890123"
+ + " foo_real[real]:1.5 foo_double[double precision]:2.5
foo_boolean[boolean]:true foo_date[date]:'2020-01-02'"
+ + " foo_time[time without time zone]:'12:34:56'
foo_timestamp[timestamp without time zone]:'2020-01-02 03:04:05'"
+ + " foo_bytea[bytea]:'\\x' foo_json[json]:'{\"a\":{\"b\":1}}'
foo_text[character varying]:'abc''def' foo_null[integer]:null";
+ WriteRowEvent actual = (WriteRowEvent) new
TestDecodingPlugin(timestampUtils).decode(ByteBuffer.wrap(wal.getBytes(StandardCharsets.UTF_8)),
logSequenceNumber);
+ assertThat(actual.getAfterRow().get(0), is(new BigDecimal("123.45")));
+ assertThat(actual.getAfterRow().get(1), is("101"));
+ assertThat(actual.getAfterRow().get(2), is((short) 1));
+ assertThat(actual.getAfterRow().get(3), is(1234567890123L));
+ assertThat(actual.getAfterRow().get(4), is(1.5F));
+ assertThat(actual.getAfterRow().get(5), is(2.5D));
+ assertThat(actual.getAfterRow().get(6), is(true));
+ assertThat(actual.getAfterRow().get(7),
is(Date.valueOf("2020-01-02")));
+ assertThat(actual.getAfterRow().get(8), is(Time.valueOf("12:34:56")));
+ assertThat(actual.getAfterRow().get(9),
is(Timestamp.valueOf("2020-01-02 03:04:05")));
+ assertThat(actual.getAfterRow().get(10), is(new byte[0]));
+ assertThat(actual.getAfterRow().get(11), is("{\"a\":{\"b\":1}}"));
+ assertThat(actual.getAfterRow().get(12), is("abc'def"));
+ assertNull(actual.getAfterRow().get(13));
+ }
+
+ @Test
+ void assertDecodeUnterminatedString() {
+ ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT:
text_col[character varying]:'unterminated".getBytes(StandardCharsets.UTF_8));
+ WriteRowEvent actual = (WriteRowEvent) new
TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertThat(actual.getAfterRow().get(0), is("unterminated"));
+ }
+
+ @Test
+ void assertDecodeJsonWithNoClosingBrace() {
+ ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT:
data[json]:'{'".getBytes(StandardCharsets.UTF_8));
+ WriteRowEvent actual = (WriteRowEvent) new
TestDecodingPlugin(null).decode(data, logSequenceNumber);
+ assertNull(actual.getAfterRow().get(0));
+ }
+
+ @Test
+ void assertDecodeTimestampSQLException() throws SQLException {
+ TimestampUtils timestampUtils = mock(TimestampUtils.class);
+ when(timestampUtils.toTimestamp(null, "2020-01-02
03:04:05")).thenThrow(new SQLException(""));
+ ByteBuffer data = ByteBuffer.wrap("table public.test: INSERT:
data[timestamp without time zone]:'2020-01-02
03:04:05'".getBytes(StandardCharsets.UTF_8));
+ assertThrows(DecodingException.class, () -> new TestDecodingPlugin(new
PostgreSQLTimestampUtils(timestampUtils)).decode(data, logSequenceNumber));
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideNullValueWal")
+ void assertDecodeNullValue(final String wal, final int expectedSize, final
Integer expectedSecond) {
+ WriteRowEvent actual = (WriteRowEvent) new
TestDecodingPlugin(null).decode(ByteBuffer.wrap(wal.getBytes(StandardCharsets.UTF_8)),
logSequenceNumber);
+ assertThat(actual.getAfterRow().size(), is(expectedSize));
+ assertNull(actual.getAfterRow().get(0));
+ if (null != expectedSecond) {
+ assertThat(actual.getAfterRow().get(1), is(expectedSecond));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideIllegalByteaWal")
+ void assertDecodeByteaIllegalArgument(final String wal) {
+ ByteBuffer data =
ByteBuffer.wrap(wal.getBytes(StandardCharsets.UTF_8));
+ assertThrows(IllegalArgumentException.class, () -> new
TestDecodingPlugin(null).decode(data, logSequenceNumber));
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideIngestExceptionWal")
+ void assertDecodeIngestException(final String wal) {
+ ByteBuffer data =
ByteBuffer.wrap(wal.getBytes(StandardCharsets.UTF_8));
+ assertThrows(IngestException.class, () -> new
TestDecodingPlugin(null).decode(data, logSequenceNumber));
+ }
+
+ private static Stream<String> provideIllegalByteaWal() {
+ return Stream.of(
+ "table public.test: INSERT: data[bytea]:'\\xff0'",
+ "table public.test: INSERT: data[bytea]:'\\xzz'");
+ }
+
+ private static Stream<String> provideIngestExceptionWal() {
+ return Stream.of(
+ "table public.test: UNKNOWN: data[character varying]:'1 2
3'''",
+ "table public.test: SELECT: id[integer]:1",
+ "table public.test: INSERT: data[json]:'{}x'");
+ }
+
+ private static Stream<Arguments> provideNullValueWal() {
+ return Stream.of(
+ Arguments.of("table public.test: INSERT:
col_null[integer]:null col_int[integer]:1", 2, 1),
+ Arguments.of("table public.test: INSERT:
col_null[integer]:null", 1, null));
+ }
}