This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a4115fc2ab Refactor DataRecordResultConvertUtilsTest (#37408)
0a4115fc2ab is described below

commit 0a4115fc2ab6306c05dcd912a077add1fe03ac46
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Dec 16 22:43:00 2025 +0800

    Refactor DataRecordResultConvertUtilsTest (#37408)
    
    * Refactor DataRecordResultConvertUtilsTest
    
    * Refactor DataRecordResultConvertUtilsTest
---
 .../cdc/util/DataRecordResultConvertUtilsTest.java | 78 ++++++++--------------
 1 file changed, 27 insertions(+), 51 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
index 1c820f28b36..8deef2250cb 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilsTest.java
@@ -17,69 +17,45 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.util;
 
-import com.google.protobuf.EmptyProto;
+import com.google.protobuf.Int64Value;
 import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TimestampProto;
-import com.google.protobuf.TypeRegistry;
-import com.google.protobuf.WrappersProto;
-import com.google.protobuf.util.JsonFormat;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.Builder;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Blob;
-import java.sql.Clob;
-import java.sql.Date;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.OffsetTime;
+import java.util.stream.Stream;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 class DataRecordResultConvertUtilsTest {
     
-    @Test
-    void assertConvertDataRecordToRecord() throws 
InvalidProtocolBufferException, SQLException {
-        DataRecord dataRecord = new 
DataRecord(PipelineSQLOperationType.INSERT, "t_order", new 
IntegerPrimaryKeyIngestPosition(0L, 1L), 2);
-        dataRecord.addColumn(new NormalColumn("order_id", BigInteger.ONE, 
false, true));
-        dataRecord.addColumn(new NormalColumn("price", 
BigDecimal.valueOf(123L), false, false));
-        dataRecord.addColumn(new NormalColumn("user_id", Long.MAX_VALUE, 
false, false));
-        dataRecord.addColumn(new NormalColumn("item_id", Integer.MAX_VALUE, 
false, false));
-        dataRecord.addColumn(new NormalColumn("create_date", LocalDate.now(), 
false, false));
-        dataRecord.addColumn(new NormalColumn("create_date2", 
Date.valueOf(LocalDate.now()), false, false));
-        dataRecord.addColumn(new NormalColumn("create_time", LocalTime.now(), 
false, false));
-        dataRecord.addColumn(new NormalColumn("create_time2", 
OffsetTime.now(), false, false));
-        dataRecord.addColumn(new NormalColumn("create_datetime", 
LocalDateTime.now(), false, false));
-        dataRecord.addColumn(new NormalColumn("create_datetime2", 
OffsetDateTime.now(), false, false));
-        dataRecord.addColumn(new NormalColumn("empty", null, false, false));
-        Blob mockedBlob = mock(Blob.class);
-        when(mockedBlob.getBytes(anyLong(), anyInt())).thenReturn(new 
byte[]{-1, 0, 1});
-        dataRecord.addColumn(new NormalColumn("data_blob", mockedBlob, false, 
false));
-        Clob mockedClob = mock(Clob.class);
-        when(mockedClob.getSubString(anyLong(), 
anyInt())).thenReturn("clob\n");
-        dataRecord.addColumn(new NormalColumn("text_clob", mockedClob, false, 
false));
-        dataRecord.addColumn(new NormalColumn("update_time", new 
Timestamp(System.currentTimeMillis()), false, false));
-        TypeRegistry registry = 
TypeRegistry.newBuilder().add(EmptyProto.getDescriptor().getMessageTypes()).add(TimestampProto.getDescriptor().getMessageTypes())
-                .add(WrappersProto.getDescriptor().getMessageTypes()).build();
-        Record expectedRecord = 
DataRecordResultConvertUtils.convertDataRecordToRecord("test", null, 
dataRecord);
-        String print = 
JsonFormat.printer().usingTypeRegistry(registry).print(expectedRecord);
-        Builder actualRecord = Record.newBuilder();
-        JsonFormat.parser().usingTypeRegistry(registry).merge(print, 
actualRecord);
-        assertThat(actualRecord.build(), is(expectedRecord));
+    @ParameterizedTest
+    @MethodSource("dataChangeTypeTestCases")
+    void assertConvertDataRecordToRecordWithNonInsertTypes(final 
PipelineSQLOperationType operationType, final Record.DataChangeType 
expectedDataChangeType) throws InvalidProtocolBufferException {
+        DataRecord dataRecord = new DataRecord(operationType, "test_schema", 
"t_user", new IntegerPrimaryKeyIngestPosition(5L, 10L), 1);
+        dataRecord.addColumn(new NormalColumn("id", 1L, 2L, true, true));
+        dataRecord.setCommitTime(123L);
+        Record actualRecord = 
DataRecordResultConvertUtils.convertDataRecordToRecord("logic_db", 
"test_schema", dataRecord);
+        assertThat(actualRecord.getMetaData().getDatabase(), is("logic_db"));
+        assertThat(actualRecord.getMetaData().getSchema(), is("test_schema"));
+        assertThat(actualRecord.getMetaData().getTable(), is("t_user"));
+        assertThat(actualRecord.getTransactionCommitMillis(), is(123L));
+        assertThat(actualRecord.getDataChangeType(), 
is(expectedDataChangeType));
+        
assertThat(actualRecord.getBefore(0).getValue().unpack(Int64Value.class).getValue(),
 is(1L));
+        
assertThat(actualRecord.getAfter(0).getValue().unpack(Int64Value.class).getValue(),
 is(2L));
+    }
+    
+    private static Stream<Arguments> dataChangeTypeTestCases() {
+        return Stream.of(
+                Arguments.of(PipelineSQLOperationType.INSERT, 
Record.DataChangeType.INSERT),
+                Arguments.of(PipelineSQLOperationType.UPDATE, 
Record.DataChangeType.UPDATE),
+                Arguments.of(PipelineSQLOperationType.DELETE, 
Record.DataChangeType.DELETE),
+                Arguments.of(PipelineSQLOperationType.SELECT, 
Record.DataChangeType.UNKNOWN));
     }
 }

Reply via email to