This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f31a16bbc9 Add more test cases on PipelineCDCSocketSinkTest (#37385)
8f31a16bbc9 is described below

commit 8f31a16bbc9cd86615318cae9b4bd4432845faa0
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 14 22:53:36 2025 +0800

    Add more test cases on PipelineCDCSocketSinkTest (#37385)
---
 .../importer/sink/PipelineCDCSocketSinkTest.java   | 70 ++++++++++++++++++----
 1 file changed, 60 insertions(+), 10 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
index d8e1db42d2d..121d98f7f58 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
@@ -18,35 +18,85 @@
 package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
 
 import io.netty.channel.Channel;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
+import 
org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 
+import java.util.Arrays;
 import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 class PipelineCDCSocketSinkTest {
     
     @Test
-    void assertWrite() {
-        Channel mockChannel = mock(Channel.class);
-        when(mockChannel.isWritable()).thenReturn(false, true);
-        when(mockChannel.isActive()).thenReturn(true);
-        ShardingSphereDatabase mockDatabase = 
mock(ShardingSphereDatabase.class);
-        when(mockDatabase.getName()).thenReturn("test");
-        try (PipelineCDCSocketSink sink = new 
PipelineCDCSocketSink(mockChannel, mockDatabase, 
Collections.singletonList("test.t_order"))) {
-            PipelineJobUpdateProgress actual = sink.write("ack", 
Collections.singletonList(new FinishedRecord(new IngestPlaceholderPosition())));
-            assertThat(actual.getProcessedRecordsCount(), is(0));
-            actual = sink.write("ack", Collections.singletonList(new 
DataRecord(PipelineSQLOperationType.DELETE, "t_order", new 
IngestPlaceholderPosition(), 1)));
+    void assertWriteWhenRecordsEmpty() {
+        try (PipelineCDCSocketSink sink = new PipelineCDCSocketSink(mock(), 
mock(), Arrays.asList("logic.t_order", "t_without_schema"))) {
+            assertThat(sink.write("ack", 
Collections.emptyList()).getProcessedRecordsCount(), is(0));
+        }
+    }
+    
+    @Test
+    void assertWriteWhenChannelWritable() {
+        Channel channel = mock(Channel.class);
+        when(channel.isWritable()).thenReturn(false, true);
+        when(channel.isActive()).thenReturn(true);
+        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
+        when(database.getName()).thenReturn("logic_db");
+        DataRecord dataRecord = new 
DataRecord(PipelineSQLOperationType.UPDATE, "t_order", new 
IngestPlaceholderPosition(), 1);
+        dataRecord.addColumn(new NormalColumn("order_id", 1, 2, true, true));
+        FinishedRecord finishedRecord = new FinishedRecord(new 
IngestPlaceholderPosition());
+        try (PipelineCDCSocketSink sink = new PipelineCDCSocketSink(channel, 
database, Collections.singletonList("logic_schema.t_order"))) {
+            PipelineJobUpdateProgress actual = sink.write("ack", 
Arrays.asList(finishedRecord, dataRecord));
             assertThat(actual.getProcessedRecordsCount(), is(1));
+            ArgumentCaptor<CDCResponse> responseCaptor = 
ArgumentCaptor.forClass(CDCResponse.class);
+            verify(channel).writeAndFlush(responseCaptor.capture());
+            CDCResponse actualResponse = responseCaptor.getValue();
+            DataRecordResult recordResult = 
actualResponse.getDataRecordResult();
+            assertThat(recordResult.getAckId(), is("ack"));
+            assertThat(recordResult.getRecordList().size(), is(1));
+            DataRecordResult.Record actualRecord = recordResult.getRecord(0);
+            assertThat(actualRecord.getMetaData().getDatabase(), 
is("logic_db"));
+            assertThat(actualRecord.getMetaData().getSchema(), 
is("logic_schema"));
+            assertThat(actualRecord.getMetaData().getTable(), is("t_order"));
         }
     }
+    
+    @Test
+    void assertWriteWhenChannelInactive() {
+        Channel channel = mock(Channel.class);
+        DataRecord dataRecord = new 
DataRecord(PipelineSQLOperationType.INSERT, "t_order", new 
IngestPlaceholderPosition(), 0);
+        try (PipelineCDCSocketSink sink = new PipelineCDCSocketSink(channel, 
mock(), Collections.singletonList("logic_schema.t_order"))) {
+            PipelineJobUpdateProgress actual = sink.write("ack", 
Collections.singletonList(dataRecord));
+            assertThat(actual.getProcessedRecordsCount(), is(0));
+            verify(channel, never()).writeAndFlush(any());
+        }
+    }
+    
+    @Test
+    void assertClose() {
+        Channel channel = mock(Channel.class);
+        new PipelineCDCSocketSink(channel, mock(), 
Collections.singletonList("logic_schema.t_order")).close();
+        ArgumentCaptor<CDCResponse> responseCaptor = 
ArgumentCaptor.forClass(CDCResponse.class);
+        verify(channel).writeAndFlush(responseCaptor.capture());
+        CDCResponse actualResponse = responseCaptor.getValue();
+        assertThat(actualResponse.getStatus(), is(CDCResponse.Status.FAILED));
+        assertThat(actualResponse.getErrorCode(), 
is(XOpenSQLState.GENERAL_ERROR.getValue()));
+        assertThat(actualResponse.getErrorMessage(), is("The socket channel is 
closed."));
+    }
 }

Reply via email to