This is an automated email from the ASF dual-hosted git repository. zhangliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 87eb05269b2 Support SimpleMemoryPipelineChannel use 0 blocking queue size (#29349) 87eb05269b2 is described below commit 87eb05269b2362ef1fbf3e69aae544a15087597f Author: Hongsheng Zhong <zhonghongsh...@apache.org> AuthorDate: Sun Dec 10 17:56:51 2023 +0800 Support SimpleMemoryPipelineChannel use 0 blocking queue size (#29349) * Support SimpleMemoryPipelineChannel use 0 blocking queue size * Enable fair for incremental --- .../core/ingest/channel/PipelineChannel.java | 1 + .../memory/MemoryPipelineChannelCreator.java | 4 ++-- .../memory/SimpleMemoryPipelineChannel.java | 4 ++-- .../memory/SimpleMemoryPipelineChannelTest.java | 25 +++++++++++++++++++--- 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java index 333bd92267f..0f5548ac15c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; /** * Pipeline channel. + * <p>It supports multiple push threads and one fetch thread.</p> */ public interface PipelineChannel extends Closeable { diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java index 8a0e1704a49..91d2e78231b 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java @@ -41,8 +41,8 @@ public final class MemoryPipelineChannelCreator implements PipelineChannelCreato @Override public PipelineChannel createPipelineChannel(final int outputConcurrency, final int averageElementSize, final AckCallback ackCallback) { - return 1 == outputConcurrency ? new SimpleMemoryPipelineChannel((int) Math.ceil((double) blockQueueSize / averageElementSize), ackCallback) - : new MultiplexMemoryPipelineChannel(outputConcurrency, blockQueueSize, ackCallback); + return 1 == outputConcurrency ? new SimpleMemoryPipelineChannel(blockQueueSize / averageElementSize, ackCallback) + : new MultiplexMemoryPipelineChannel(outputConcurrency, blockQueueSize < 1 ? 5 : blockQueueSize, ackCallback); } @Override diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java index 2389ef4baa3..4c56ef2b68c 100644 --- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java +++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java @@ -27,6 +27,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; /** @@ -39,7 +40,7 @@ public final class SimpleMemoryPipelineChannel implements PipelineChannel { private final AckCallback ackCallback; public SimpleMemoryPipelineChannel(final int blockQueueSize, final AckCallback ackCallback) { - this.queue = new ArrayBlockingQueue<>(blockQueueSize); + this.queue = blockQueueSize < 1 ? new SynchronousQueue<>(true) : new ArrayBlockingQueue<>(blockQueueSize, true); this.ackCallback = ackCallback; } @@ -50,7 +51,6 @@ public final class SimpleMemoryPipelineChannel implements PipelineChannel { } @SneakyThrows(InterruptedException.class) - // TODO thread-safe? @Override public List<Record> fetchRecords(final int batchSize, final long timeout, final TimeUnit timeUnit) { List<Record> result = new LinkedList<>(); diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java index 57bc8eb56e8..506f3f188d8 100644 --- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java +++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java @@ -17,24 +17,43 @@ package org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory; +import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback; +import org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition; +import org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord; +import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record; import org.junit.jupiter.api.Test; +import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; class SimpleMemoryPipelineChannelTest { + @SneakyThrows(InterruptedException.class) + @Test + void assertZeroQueueSizeWorks() { + SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(0, new EmptyAckCallback()); + List<Record> records = Collections.singletonList(new PlaceholderRecord(new FinishedPosition())); + Thread thread = new Thread(() -> channel.pushRecords(records)); + thread.start(); + assertThat(channel.fetchRecords(1, 500, TimeUnit.MILLISECONDS), is(records)); + thread.join(); + } + @Test void assertFetchRecordsTimeoutCorrectly() { - SimpleMemoryPipelineChannel simpleMemoryPipelineChannel = new SimpleMemoryPipelineChannel(10, new EmptyAckCallback()); + SimpleMemoryPipelineChannel channel = new SimpleMemoryPipelineChannel(10, new EmptyAckCallback()); long startMills = System.currentTimeMillis(); - simpleMemoryPipelineChannel.fetchRecords(1, 1, TimeUnit.MILLISECONDS); + channel.fetchRecords(1, 1, TimeUnit.MILLISECONDS); long delta = System.currentTimeMillis() - startMills; assertTrue(delta >= 1 && delta < 50, "Delta is not in [1,50) : " + delta); startMills = System.currentTimeMillis(); - simpleMemoryPipelineChannel.fetchRecords(1, 500, TimeUnit.MILLISECONDS); + channel.fetchRecords(1, 500, TimeUnit.MILLISECONDS); delta = System.currentTimeMillis() - startMills; assertTrue(delta >= 500 && delta < 650, "Delta is not in [500,650) : " + delta); }