This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0a4115fc2ab Refactor DataRecordResultConvertUtilsTest (#37408)
0a4115fc2ab is described below
commit 0a4115fc2ab6306c05dcd912a077add1fe03ac46
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Dec 16 22:43:00 2025 +0800
Refactor DataRecordResultConvertUtilsTest (#37408)
* Refactor DataRecordResultConvertUtilsTest
* Refactor DataRecordResultConvertUtilsTest
---
.../cdc/util/DataRecordResultConvertUtilsTest.java | 78 ++++++++--------------
1 file changed, 27 insertions(+), 51 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
index 1c820f28b36..8deef2250cb 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
@@ -17,69 +17,45 @@
package org.apache.shardingsphere.data.pipeline.cdc.util;
-import com.google.protobuf.EmptyProto;
+import com.google.protobuf.Int64Value;
import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TimestampProto;
-import com.google.protobuf.TypeRegistry;
-import com.google.protobuf.WrappersProto;
-import com.google.protobuf.util.JsonFormat;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.Builder;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Blob;
-import java.sql.Clob;
-import java.sql.Date;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.OffsetTime;
+import java.util.stream.Stream;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
class DataRecordResultConvertUtilsTest {
- @Test
- void assertConvertDataRecordToRecord() throws
InvalidProtocolBufferException, SQLException {
- DataRecord dataRecord = new
DataRecord(PipelineSQLOperationType.INSERT, "t_order", new
IntegerPrimaryKeyIngestPosition(0L, 1L), 2);
- dataRecord.addColumn(new NormalColumn("order_id", BigInteger.ONE,
false, true));
- dataRecord.addColumn(new NormalColumn("price",
BigDecimal.valueOf(123L), false, false));
- dataRecord.addColumn(new NormalColumn("user_id", Long.MAX_VALUE,
false, false));
- dataRecord.addColumn(new NormalColumn("item_id", Integer.MAX_VALUE,
false, false));
- dataRecord.addColumn(new NormalColumn("create_date", LocalDate.now(),
false, false));
- dataRecord.addColumn(new NormalColumn("create_date2",
Date.valueOf(LocalDate.now()), false, false));
- dataRecord.addColumn(new NormalColumn("create_time", LocalTime.now(),
false, false));
- dataRecord.addColumn(new NormalColumn("create_time2",
OffsetTime.now(), false, false));
- dataRecord.addColumn(new NormalColumn("create_datetime",
LocalDateTime.now(), false, false));
- dataRecord.addColumn(new NormalColumn("create_datetime2",
OffsetDateTime.now(), false, false));
- dataRecord.addColumn(new NormalColumn("empty", null, false, false));
- Blob mockedBlob = mock(Blob.class);
- when(mockedBlob.getBytes(anyLong(), anyInt())).thenReturn(new
byte[]{-1, 0, 1});
- dataRecord.addColumn(new NormalColumn("data_blob", mockedBlob, false,
false));
- Clob mockedClob = mock(Clob.class);
- when(mockedClob.getSubString(anyLong(),
anyInt())).thenReturn("clob\n");
- dataRecord.addColumn(new NormalColumn("text_clob", mockedClob, false,
false));
- dataRecord.addColumn(new NormalColumn("update_time", new
Timestamp(System.currentTimeMillis()), false, false));
- TypeRegistry registry =
TypeRegistry.newBuilder().add(EmptyProto.getDescriptor().getMessageTypes()).add(TimestampProto.getDescriptor().getMessageTypes())
- .add(WrappersProto.getDescriptor().getMessageTypes()).build();
- Record expectedRecord =
DataRecordResultConvertUtils.convertDataRecordToRecord("test", null,
dataRecord);
- String print =
JsonFormat.printer().usingTypeRegistry(registry).print(expectedRecord);
- Builder actualRecord = Record.newBuilder();
- JsonFormat.parser().usingTypeRegistry(registry).merge(print,
actualRecord);
- assertThat(actualRecord.build(), is(expectedRecord));
+ @ParameterizedTest
+ @MethodSource("dataChangeTypeTestCases")
+ void assertConvertDataRecordToRecordWithNonInsertTypes(final
PipelineSQLOperationType operationType, final Record.DataChangeType
expectedDataChangeType) throws InvalidProtocolBufferException {
+ DataRecord dataRecord = new DataRecord(operationType, "test_schema",
"t_user", new IntegerPrimaryKeyIngestPosition(5L, 10L), 1);
+ dataRecord.addColumn(new NormalColumn("id", 1L, 2L, true, true));
+ dataRecord.setCommitTime(123L);
+ Record actualRecord =
DataRecordResultConvertUtils.convertDataRecordToRecord("logic_db",
"test_schema", dataRecord);
+ assertThat(actualRecord.getMetaData().getDatabase(), is("logic_db"));
+ assertThat(actualRecord.getMetaData().getSchema(), is("test_schema"));
+ assertThat(actualRecord.getMetaData().getTable(), is("t_user"));
+ assertThat(actualRecord.getTransactionCommitMillis(), is(123L));
+ assertThat(actualRecord.getDataChangeType(),
is(expectedDataChangeType));
+
assertThat(actualRecord.getBefore(0).getValue().unpack(Int64Value.class).getValue(),
is(1L));
+
assertThat(actualRecord.getAfter(0).getValue().unpack(Int64Value.class).getValue(),
is(2L));
+ }
+
+ private static Stream<Arguments> dataChangeTypeTestCases() {
+ return Stream.of(
+ Arguments.of(PipelineSQLOperationType.INSERT,
Record.DataChangeType.INSERT),
+ Arguments.of(PipelineSQLOperationType.UPDATE,
Record.DataChangeType.UPDATE),
+ Arguments.of(PipelineSQLOperationType.DELETE,
Record.DataChangeType.DELETE),
+ Arguments.of(PipelineSQLOperationType.SELECT,
Record.DataChangeType.UNKNOWN));
}
}