This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 87eb05269b2 Support SimpleMemoryPipelineChannel use 0 blocking queue 
size (#29349)
87eb05269b2 is described below

commit 87eb05269b2362ef1fbf3e69aae544a15087597f
Author: Hongsheng Zhong <zhonghongsh...@apache.org>
AuthorDate: Sun Dec 10 17:56:51 2023 +0800

    Support SimpleMemoryPipelineChannel use 0 blocking queue size (#29349)
    
    * Support SimpleMemoryPipelineChannel use 0 blocking queue size
    
    * Enable fair for incremental
---
 .../core/ingest/channel/PipelineChannel.java       |  1 +
 .../memory/MemoryPipelineChannelCreator.java       |  4 ++--
 .../memory/SimpleMemoryPipelineChannel.java        |  4 ++--
 .../memory/SimpleMemoryPipelineChannelTest.java    | 25 +++++++++++++++++++---
 4 files changed, 27 insertions(+), 7 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
index 333bd92267f..0f5548ac15c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/PipelineChannel.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * Pipeline channel.
+ * <p>It supports multiple push threads and one fetch thread.</p>
  */
 public interface PipelineChannel extends Closeable {
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
index 8a0e1704a49..91d2e78231b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/MemoryPipelineChannelCreator.java
@@ -41,8 +41,8 @@ public final class MemoryPipelineChannelCreator implements 
PipelineChannelCreato
     
     @Override
     public PipelineChannel createPipelineChannel(final int outputConcurrency, 
final int averageElementSize, final AckCallback ackCallback) {
-        return 1 == outputConcurrency ? new SimpleMemoryPipelineChannel((int) 
Math.ceil((double) blockQueueSize / averageElementSize), ackCallback)
-                : new MultiplexMemoryPipelineChannel(outputConcurrency, 
blockQueueSize, ackCallback);
+        return 1 == outputConcurrency ? new 
SimpleMemoryPipelineChannel(blockQueueSize / averageElementSize, ackCallback)
+                : new MultiplexMemoryPipelineChannel(outputConcurrency, 
blockQueueSize < 1 ? 5 : blockQueueSize, ackCallback);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
index 2389ef4baa3..4c56ef2b68c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannel.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -39,7 +40,7 @@ public final class SimpleMemoryPipelineChannel implements 
PipelineChannel {
     private final AckCallback ackCallback;
     
     public SimpleMemoryPipelineChannel(final int blockQueueSize, final 
AckCallback ackCallback) {
-        this.queue = new ArrayBlockingQueue<>(blockQueueSize);
+        this.queue = blockQueueSize < 1 ? new SynchronousQueue<>(true) : new 
ArrayBlockingQueue<>(blockQueueSize, true);
         this.ackCallback = ackCallback;
     }
     
@@ -50,7 +51,6 @@ public final class SimpleMemoryPipelineChannel implements 
PipelineChannel {
     }
     
     @SneakyThrows(InterruptedException.class)
-    // TODO thread-safe?
     @Override
     public List<Record> fetchRecords(final int batchSize, final long timeout, 
final TimeUnit timeUnit) {
         List<Record> result = new LinkedList<>();
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
index 57bc8eb56e8..506f3f188d8 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/channel/memory/SimpleMemoryPipelineChannelTest.java
@@ -17,24 +17,43 @@
 
 package org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory;
 
+import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.EmptyAckCallback;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class SimpleMemoryPipelineChannelTest {
     
+    @SneakyThrows(InterruptedException.class)
+    @Test
+    void assertZeroQueueSizeWorks() {
+        SimpleMemoryPipelineChannel channel = new 
SimpleMemoryPipelineChannel(0, new EmptyAckCallback());
+        List<Record> records = Collections.singletonList(new 
PlaceholderRecord(new FinishedPosition()));
+        Thread thread = new Thread(() -> channel.pushRecords(records));
+        thread.start();
+        assertThat(channel.fetchRecords(1, 500, TimeUnit.MILLISECONDS), 
is(records));
+        thread.join();
+    }
+    
     @Test
     void assertFetchRecordsTimeoutCorrectly() {
-        SimpleMemoryPipelineChannel simpleMemoryPipelineChannel = new 
SimpleMemoryPipelineChannel(10, new EmptyAckCallback());
+        SimpleMemoryPipelineChannel channel = new 
SimpleMemoryPipelineChannel(10, new EmptyAckCallback());
         long startMills = System.currentTimeMillis();
-        simpleMemoryPipelineChannel.fetchRecords(1, 1, TimeUnit.MILLISECONDS);
+        channel.fetchRecords(1, 1, TimeUnit.MILLISECONDS);
         long delta = System.currentTimeMillis() - startMills;
         assertTrue(delta >= 1 && delta < 50, "Delta is not in [1,50) : " + 
delta);
         startMills = System.currentTimeMillis();
-        simpleMemoryPipelineChannel.fetchRecords(1, 500, 
TimeUnit.MILLISECONDS);
+        channel.fetchRecords(1, 500, TimeUnit.MILLISECONDS);
         delta = System.currentTimeMillis() - startMills;
         assertTrue(delta >= 500 && delta < 650, "Delta is not in [500,650) : " 
+ delta);
     }

Reply via email to