This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new f2ebcab8ba3 Add test cases on MemoryPipelineChannel (#33315) f2ebcab8ba3 is described below commit f2ebcab8ba3a34dc62019c7be78a7c7b71d77758 Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Sat Oct 19 11:41:09 2024 +0800 Add test cases on MemoryPipelineChannel (#33315) --- .../channel/memory/MemoryPipelineChannelTest.java | 36 ++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java index 4d6f941fe34..74a5afb9b63 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/channel/memory/MemoryPipelineChannelTest.java @@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; class MemoryPipelineChannelTest { @@ -58,4 +60,38 @@ class MemoryPipelineChannelTest { channel.push(records); assertThat(channel.fetch(10, 0L), is(records)); } + + @Test + void assertPeekWithRecords() { + MemoryPipelineChannel channel = new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>())); + List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition())); + channel.push(records); + assertThat(channel.peek(), is(records)); + } + + @Test + void assertPeekWithoutRecords() { + assertThat(new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>())).peek(), is(Collections.emptyList())); + } + + @Test + void assertPollWithRecords() { + MemoryPipelineChannel channel = new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>())); + List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition())); + channel.push(records); + assertThat(channel.poll(), is(records)); + } + + @Test + void assertPollWithoutRecords() { + assertThat(new MemoryPipelineChannel(100, new InventoryTaskAckCallback(new AtomicReference<>())).poll(), is(Collections.emptyList())); + } + + @Test + void assertAck() { + InventoryTaskAckCallback callback = mock(InventoryTaskAckCallback.class); + List<Record> records = Collections.singletonList(new PlaceholderRecord(new IngestFinishedPosition())); + new MemoryPipelineChannel(100, callback).ack(records); + verify(callback).onAck(records); + } }