This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9a7ab34a15c Add CDCImporterTest (#37386)
9a7ab34a15c is described below
commit 9a7ab34a15c4df320968f77eadee88d2f5b61fbc
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Dec 15 00:32:10 2025 +0800
Add CDCImporterTest (#37386)
* Add CDCImporterTest
* Add CDCImporterTest
---
.../cdc/core/importer/CDCImporterTest.java | 225 +++++++++++++++++++++
1 file changed, 225 insertions(+)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterTest.java
new file mode 100644
index 00000000000..48569d7dedb
--- /dev/null
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
+
+import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
+import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class CDCImporterTest {
+
+ @Test
+ void assertDoWithoutSortingCoversEmptyNonDataAndData() {
+ List<List<Record>> fetchResults =
Arrays.asList(Collections.emptyList(), asList(mock(Record.class)),
asList(createDataRecord(1L), createFinishedRecord()));
+ PipelineChannel channel = mockNonSortingChannel(fetchResults);
+ PipelineJobProgressListener progressListener =
mock(PipelineJobProgressListener.class);
+ AtomicReference<CDCImporter> importerHolder = new AtomicReference<>();
+ PipelineSink sink = mockSinkWithAck(importerHolder, false);
+ CDCImporter importer = new CDCImporter(new
LinkedList<>(Collections.singletonList(new CDCChannelProgressPair(channel,
progressListener))), 10, 1L, sink, false, null);
+ importerHolder.set(importer);
+ importer.start();
+ importer.stop();
+ assertThat(captureAckRecords(channel, 2).size(), is(2));
+ assertThat(captureProgressCounts(progressListener, 3), contains(0, 0,
1));
+ assertNull(CDCImporterManager.getImporter(importer.getImporterId()));
+ }
+
+ @Test
+ void assertDoWithoutSortingNonDataFinishedRemovesChannel() {
+ PipelineChannel channel =
mockNonSortingChannel(Collections.singletonList(asList(createFinishedRecord())));
+ PipelineJobProgressListener progressListener =
mock(PipelineJobProgressListener.class);
+ PipelineSink sink = mockSimpleSink();
+ CDCImporter importer = new CDCImporter(new
LinkedList<>(Collections.singletonList(new CDCChannelProgressPair(channel,
progressListener))), 1, 1L, sink, false, null);
+ importer.start();
+ importer.stop();
+ assertThat(captureAckRecords(channel, 1).get(0).get(0),
instanceOf(FinishedRecord.class));
+ assertThat(captureProgressCounts(progressListener, 2), contains(0, 0));
+ }
+
+ @Test
+ void assertDoWithoutSortingWithRateLimitAndAck() {
+ PipelineChannel channel =
mockNonSortingChannel(Collections.singletonList(asList(createDataRecord(5L))));
+ PipelineJobProgressListener progressListener =
mock(PipelineJobProgressListener.class);
+ JobRateLimitAlgorithm rateLimitAlgorithm =
mock(JobRateLimitAlgorithm.class);
+ AtomicReference<CDCImporter> importerHolder = new AtomicReference<>();
+ PipelineSink sink = mockSinkWithAck(importerHolder, true);
+ CDCImporter importer = new CDCImporter(new
LinkedList<>(Collections.singletonList(new CDCChannelProgressPair(channel,
progressListener))), 1, 1L, sink, false, rateLimitAlgorithm);
+ importerHolder.set(importer);
+ importer.start();
+ verify(rateLimitAlgorithm).intercept(PipelineSQLOperationType.INSERT,
1);
+ assertThat(captureProgressCounts(progressListener, 2), contains(0, 1));
+ assertThat(captureAckRecords(channel, 1).size(), is(1));
+ }
+
+ @Test
+ void assertDoWithSortingHandlesEmptyAndSingleTransaction() {
+ Queue<List<Record>> transactions = new
LinkedList<>(Arrays.asList(Collections.emptyList(),
asList(createFinishedRecord(), createDataRecord(2L), createFinishedRecord())));
+ PipelineChannel channel = mockSortingChannel(transactions);
+ PipelineJobProgressListener progressListener =
mock(PipelineJobProgressListener.class);
+ AtomicReference<CDCImporter> importerHolder = new AtomicReference<>();
+ PipelineSink sink = mockSinkWithAck(importerHolder, false);
+ CDCImporter importer = new CDCImporter(new
LinkedList<>(Collections.singletonList(new CDCChannelProgressPair(channel,
progressListener))), 1, 1L, sink, true, null);
+ importerHolder.set(importer);
+ importer.start();
+ importer.stop();
+ List<List<Record>> ackedRecords = captureAckRecords(channel, 1);
+ assertThat(ackedRecords.size(), is(1));
+ assertThat(ackedRecords.get(0).get(0),
instanceOf(FinishedRecord.class));
+ assertThat(captureProgressCounts(progressListener, 2), contains(0, 1));
+ }
+
+ @Test
+ void assertDoWithSortingProcessesMultipleCsnBatches() {
+ Queue<List<Record>> channelOneTransactions = new
LinkedList<>(Arrays.asList(asList(createFinishedRecord(),
createDataRecord(4L)), asList(createDataRecord(4L)), Collections.emptyList()));
+ PipelineChannel channelOne =
mockSortingChannel(channelOneTransactions);
+ Queue<List<Record>> channelTwoTransactions = new
LinkedList<>(Arrays.asList(asList(createDataRecord(4L),
createFinishedRecord()), asList(createFinishedRecord()),
Collections.emptyList()));
+ PipelineChannel channelTwo =
mockSortingChannel(channelTwoTransactions);
+ Queue<List<Record>> channelThreeTransactions = new
LinkedList<>(Arrays.asList(asList(createDataRecord(6L)),
asList(createDataRecord(7L))));
+ PipelineChannel channelThree =
mockSortingChannel(channelThreeTransactions);
+ Queue<List<Record>> channelFourTransactions = new
LinkedList<>(Arrays.asList(asList(createFinishedRecord()),
Collections.emptyList()));
+ PipelineChannel channelFour =
mockSortingChannel(channelFourTransactions);
+ List<CDCChannelProgressPair> pairs = new LinkedList<>();
+ pairs.add(new CDCChannelProgressPair(channelOne,
mock(PipelineJobProgressListener.class)));
+ pairs.add(new CDCChannelProgressPair(channelTwo,
mock(PipelineJobProgressListener.class)));
+ pairs.add(new CDCChannelProgressPair(channelThree,
mock(PipelineJobProgressListener.class)));
+ pairs.add(new CDCChannelProgressPair(channelFour,
mock(PipelineJobProgressListener.class)));
+ JobRateLimitAlgorithm rateLimitAlgorithm =
mock(JobRateLimitAlgorithm.class);
+ AtomicReference<CDCImporter> importerHolder = new AtomicReference<>();
+ PipelineSink sink = mockSinkWithAck(importerHolder, true);
+ CDCImporter importer = new CDCImporter(pairs, 2, 1L, sink, true,
rateLimitAlgorithm);
+ importerHolder.set(importer);
+ importer.start();
+ verify(rateLimitAlgorithm).intercept(PipelineSQLOperationType.INSERT,
1);
+ assertThat(captureAckRecords(channelTwo, 2).size(), is(2));
+ assertThat(captureAckRecords(channelFour, 1).size(), is(1));
+ verify(channelThree, never()).ack(any());
+ importer.ack("missing_ack");
+ assertNull(CDCImporterManager.getImporter(importer.getImporterId()));
+ }
+
+ private DataRecord createDataRecord(final long csn) {
+ DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
"t_order", new IngestPlaceholderPosition(), 1);
+ result.setCsn(csn);
+ return result;
+ }
+
+ private FinishedRecord createFinishedRecord() {
+ return new FinishedRecord(new IngestPlaceholderPosition());
+ }
+
+ private List<Record> asList(final Record... records) {
+ return new LinkedList<>(Arrays.asList(records));
+ }
+
+ private PipelineChannel mockNonSortingChannel(final List<List<Record>>
fetchResults) {
+ PipelineChannel result = mock(PipelineChannel.class);
+ Queue<List<Record>> fetchQueue = new LinkedList<>(fetchResults);
+ when(result.fetch(anyInt(), anyLong())).thenAnswer(invocation ->
fetchQueue.isEmpty() ? Collections.emptyList() : fetchQueue.poll());
+ when(result.peek()).thenReturn(Collections.emptyList());
+ when(result.poll()).thenReturn(Collections.emptyList());
+ return result;
+ }
+
+ private PipelineChannel mockSortingChannel(final Queue<List<Record>>
transactions) {
+ PipelineChannel result = mock(PipelineChannel.class);
+ when(result.fetch(anyInt(),
anyLong())).thenReturn(Collections.emptyList());
+ when(result.peek()).thenAnswer(invocation -> {
+ List<Record> records = transactions.peek();
+ return null == records ? Collections.emptyList() : records;
+ });
+ when(result.poll()).thenAnswer(invocation -> {
+ List<Record> records = transactions.poll();
+ return null == records ? Collections.emptyList() : records;
+ });
+ return result;
+ }
+
+ private PipelineSink mockSimpleSink() {
+ PipelineSink result = mock(PipelineSink.class);
+ when(result.write(anyString(), anyCollection())).thenAnswer(invocation
-> {
+ Collection<Record> records = invocation.getArgument(1);
+ return new PipelineJobUpdateProgress(records.size());
+ });
+ return result;
+ }
+
+ private PipelineSink mockSinkWithAck(final AtomicReference<CDCImporter>
importerHolder, final boolean stopAfterWrite) {
+ PipelineSink result = mock(PipelineSink.class);
+ when(result.write(anyString(), anyCollection())).thenAnswer(invocation
-> {
+ String ackId = invocation.getArgument(0);
+ Collection<Record> records = invocation.getArgument(1);
+ importerHolder.get().ack(ackId);
+ if (stopAfterWrite) {
+ importerHolder.get().stop();
+ }
+ return new PipelineJobUpdateProgress(records.size());
+ });
+ return result;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private List<List<Record>> captureAckRecords(final PipelineChannel
channel, final int expectedTimes) {
+ ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
+ verify(channel, times(expectedTimes)).ack(captor.capture());
+ return captor.getAllValues().stream().map(each -> {
+ @SuppressWarnings("unchecked")
+ List<Record> records = (List<Record>) each;
+ return records;
+ }).collect(Collectors.toList());
+ }
+
+ private List<Integer> captureProgressCounts(final
PipelineJobProgressListener listener, final int expectedTimes) {
+ ArgumentCaptor<PipelineJobUpdateProgress> captor =
ArgumentCaptor.forClass(PipelineJobUpdateProgress.class);
+ verify(listener,
times(expectedTimes)).onProgressUpdated(captor.capture());
+ return
captor.getAllValues().stream().map(PipelineJobUpdateProgress::getProcessedRecordsCount).collect(Collectors.toList());
+ }
+}