This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8f31a16bbc9 Add more test cases on PipelineCDCSocketSinkTest (#37385)
8f31a16bbc9 is described below
commit 8f31a16bbc9cd86615318cae9b4bd4432845faa0
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 14 22:53:36 2025 +0800
Add more test cases on PipelineCDCSocketSinkTest (#37385)
---
.../importer/sink/PipelineCDCSocketSinkTest.java | 70 ++++++++++++++++++----
1 file changed, 60 insertions(+), 10 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
index d8e1db42d2d..121d98f7f58 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/PipelineCDCSocketSinkTest.java
@@ -18,35 +18,85 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink;
import io.netty.channel.Channel;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
+import
org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import java.util.Arrays;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class PipelineCDCSocketSinkTest {
@Test
- void assertWrite() {
- Channel mockChannel = mock(Channel.class);
- when(mockChannel.isWritable()).thenReturn(false, true);
- when(mockChannel.isActive()).thenReturn(true);
- ShardingSphereDatabase mockDatabase =
mock(ShardingSphereDatabase.class);
- when(mockDatabase.getName()).thenReturn("test");
- try (PipelineCDCSocketSink sink = new
PipelineCDCSocketSink(mockChannel, mockDatabase,
Collections.singletonList("test.t_order"))) {
- PipelineJobUpdateProgress actual = sink.write("ack",
Collections.singletonList(new FinishedRecord(new IngestPlaceholderPosition())));
- assertThat(actual.getProcessedRecordsCount(), is(0));
- actual = sink.write("ack", Collections.singletonList(new
DataRecord(PipelineSQLOperationType.DELETE, "t_order", new
IngestPlaceholderPosition(), 1)));
+ void assertWriteWhenRecordsEmpty() {
+ try (PipelineCDCSocketSink sink = new PipelineCDCSocketSink(mock(),
mock(), Arrays.asList("logic.t_order", "t_without_schema"))) {
+ assertThat(sink.write("ack",
Collections.emptyList()).getProcessedRecordsCount(), is(0));
+ }
+ }
+
+ @Test
+ void assertWriteWhenChannelWritable() {
+ Channel channel = mock(Channel.class);
+ when(channel.isWritable()).thenReturn(false, true);
+ when(channel.isActive()).thenReturn(true);
+ ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
+ when(database.getName()).thenReturn("logic_db");
+ DataRecord dataRecord = new
DataRecord(PipelineSQLOperationType.UPDATE, "t_order", new
IngestPlaceholderPosition(), 1);
+ dataRecord.addColumn(new NormalColumn("order_id", 1, 2, true, true));
+ FinishedRecord finishedRecord = new FinishedRecord(new
IngestPlaceholderPosition());
+ try (PipelineCDCSocketSink sink = new PipelineCDCSocketSink(channel,
database, Collections.singletonList("logic_schema.t_order"))) {
+ PipelineJobUpdateProgress actual = sink.write("ack",
Arrays.asList(finishedRecord, dataRecord));
assertThat(actual.getProcessedRecordsCount(), is(1));
+ ArgumentCaptor<CDCResponse> responseCaptor =
ArgumentCaptor.forClass(CDCResponse.class);
+ verify(channel).writeAndFlush(responseCaptor.capture());
+ CDCResponse actualResponse = responseCaptor.getValue();
+ DataRecordResult recordResult =
actualResponse.getDataRecordResult();
+ assertThat(recordResult.getAckId(), is("ack"));
+ assertThat(recordResult.getRecordList().size(), is(1));
+ DataRecordResult.Record actualRecord = recordResult.getRecord(0);
+ assertThat(actualRecord.getMetaData().getDatabase(),
is("logic_db"));
+ assertThat(actualRecord.getMetaData().getSchema(),
is("logic_schema"));
+ assertThat(actualRecord.getMetaData().getTable(), is("t_order"));
}
}
+
+ @Test
+ void assertWriteWhenChannelInactive() {
+ Channel channel = mock(Channel.class);
+ DataRecord dataRecord = new
DataRecord(PipelineSQLOperationType.INSERT, "t_order", new
IngestPlaceholderPosition(), 0);
+ try (PipelineCDCSocketSink sink = new PipelineCDCSocketSink(channel,
mock(), Collections.singletonList("logic_schema.t_order"))) {
+ PipelineJobUpdateProgress actual = sink.write("ack",
Collections.singletonList(dataRecord));
+ assertThat(actual.getProcessedRecordsCount(), is(0));
+ verify(channel, never()).writeAndFlush(any());
+ }
+ }
+
+ @Test
+ void assertClose() {
+ Channel channel = mock(Channel.class);
+ new PipelineCDCSocketSink(channel, mock(),
Collections.singletonList("logic_schema.t_order")).close();
+ ArgumentCaptor<CDCResponse> responseCaptor =
ArgumentCaptor.forClass(CDCResponse.class);
+ verify(channel).writeAndFlush(responseCaptor.capture());
+ CDCResponse actualResponse = responseCaptor.getValue();
+ assertThat(actualResponse.getStatus(), is(CDCResponse.Status.FAILED));
+ assertThat(actualResponse.getErrorCode(),
is(XOpenSQLState.GENERAL_ERROR.getValue()));
+ assertThat(actualResponse.getErrorMessage(), is("The socket channel is
closed."));
+ }
}