This is an automated email from the ASF dual-hosted git repository.

tolbertam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/cassandra-java-driver.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 6d3ba4763 Reduce lock held duration in 
ConcurrencyLimitingRequestThrottler
6d3ba4763 is described below

commit 6d3ba47631ebde78460168a2d33c4facde0bd731
Author: Jason Koch <jk...@netflix.com>
AuthorDate: Mon Aug 12 22:52:13 2024 -0700

    Reduce lock held duration in ConcurrencyLimitingRequestThrottler
    
    It might take some (small) time for callback handling when the
    throttler request proceeds to submission.
    
    Before this change, the throttler proceed request will happen while
    holding the lock, preventing other tasks from proceeding when there is
    spare capacity and even preventing tasks from enqueuing until the
    callback completes.
    
    By tracking the expected outcome, we can perform the callback outside
    of the lock. This means that request registration and submission can
    proceed even when a long callback is being processed.
    
    patch by Jason Koch; Reviewed by Andy Tolbert and Chris Lohfink for 
CASSANDRA-19922
---
 .../ConcurrencyLimitingRequestThrottler.java       |  39 +++++-
 .../ConcurrencyLimitingRequestThrottlerTest.java   | 143 +++++++++++++++++++--
 .../core/session/throttling/MockThrottled.java     |  30 ++++-
 .../RateLimitingRequestThrottlerTest.java          |  30 ++---
 4 files changed, 206 insertions(+), 36 deletions(-)

diff --git 
a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
 
b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
index 438bed095..ffe0ffe96 100644
--- 
a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
+++ 
b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
@@ -25,6 +25,7 @@ import 
com.datastax.oss.driver.api.core.session.throttling.RequestThrottler;
 import com.datastax.oss.driver.api.core.session.throttling.Throttled;
 import 
com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
 import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.concurrent.locks.ReentrantLock;
@@ -87,6 +88,8 @@ public class ConcurrencyLimitingRequestThrottler implements 
RequestThrottler {
 
   @Override
   public void register(@NonNull Throttled request) {
+    boolean notifyReadyRequired = false;
+
     lock.lock();
     try {
       if (closed) {
@@ -96,7 +99,7 @@ public class ConcurrencyLimitingRequestThrottler implements 
RequestThrottler {
         // We have capacity for one more concurrent request
         LOG.trace("[{}] Starting newly registered request", logPrefix);
         concurrentRequests += 1;
-        request.onThrottleReady(false);
+        notifyReadyRequired = true;
       } else if (queue.size() < maxQueueSize) {
         LOG.trace("[{}] Enqueuing request", logPrefix);
         queue.add(request);
@@ -112,16 +115,26 @@ public class ConcurrencyLimitingRequestThrottler 
implements RequestThrottler {
     } finally {
       lock.unlock();
     }
+
+    // no need to hold the lock while allowing the task to progress
+    if (notifyReadyRequired) {
+      request.onThrottleReady(false);
+    }
   }
 
   @Override
   public void signalSuccess(@NonNull Throttled request) {
+    Throttled nextRequest = null;
     lock.lock();
     try {
-      onRequestDone();
+      nextRequest = onRequestDoneAndDequeNext();
     } finally {
       lock.unlock();
     }
+
+    if (nextRequest != null) {
+      nextRequest.onThrottleReady(true);
+    }
   }
 
   @Override
@@ -131,48 +144,62 @@ public class ConcurrencyLimitingRequestThrottler 
implements RequestThrottler {
 
   @Override
   public void signalTimeout(@NonNull Throttled request) {
+    Throttled nextRequest = null;
     lock.lock();
     try {
       if (!closed) {
         if (queue.remove(request)) { // The request timed out before it was 
active
           LOG.trace("[{}] Removing timed out request from the queue", 
logPrefix);
         } else {
-          onRequestDone();
+          nextRequest = onRequestDoneAndDequeNext();
         }
       }
     } finally {
       lock.unlock();
     }
+
+    if (nextRequest != null) {
+      nextRequest.onThrottleReady(true);
+    }
   }
 
   @Override
   public void signalCancel(@NonNull Throttled request) {
+    Throttled nextRequest = null;
     lock.lock();
     try {
       if (!closed) {
         if (queue.remove(request)) { // The request has been cancelled before 
it was active
           LOG.trace("[{}] Removing cancelled request from the queue", 
logPrefix);
         } else {
-          onRequestDone();
+          nextRequest = onRequestDoneAndDequeNext();
         }
       }
     } finally {
       lock.unlock();
     }
+
+    if (nextRequest != null) {
+      nextRequest.onThrottleReady(true);
+    }
   }
 
   @SuppressWarnings("GuardedBy") // this method is only called with the lock 
held
-  private void onRequestDone() {
+  @Nullable
+  private Throttled onRequestDoneAndDequeNext() {
     assert lock.isHeldByCurrentThread();
     if (!closed) {
       if (queue.isEmpty()) {
         concurrentRequests -= 1;
       } else {
         LOG.trace("[{}] Starting dequeued request", logPrefix);
-        queue.poll().onThrottleReady(true);
         // don't touch concurrentRequests since we finished one but started 
another
+        return queue.poll();
       }
     }
+
+    // no next task was dequeued
+    return null;
   }
 
   @Override
diff --git 
a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java
 
b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java
index c01b26c1e..7eb682070 100644
--- 
a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java
+++ 
b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java
@@ -29,6 +29,7 @@ import com.datastax.oss.driver.api.core.context.DriverContext;
 import com.datastax.oss.driver.api.core.session.throttling.Throttled;
 import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Consumer;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,7 +68,7 @@ public class ConcurrencyLimitingRequestThrottlerTest {
     throttler.register(request);
 
     // Then
-    assertThatStage(request.started).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
+    assertThatStage(request.ended).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
     assertThat(throttler.getConcurrentRequests()).isEqualTo(1);
     assertThat(throttler.getQueue()).isEmpty();
   }
@@ -98,7 +99,7 @@ public class ConcurrencyLimitingRequestThrottlerTest {
     // Given
     MockThrottled first = new MockThrottled();
     throttler.register(first);
-    assertThatStage(first.started).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
+    assertThatStage(first.ended).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
     for (int i = 0; i < 4; i++) { // fill to capacity
       throttler.register(new MockThrottled());
     }
@@ -113,7 +114,7 @@ public class ConcurrencyLimitingRequestThrottlerTest {
     throttler.register(incoming);
 
     // Then
-    assertThatStage(incoming.started).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
+    assertThatStage(incoming.ended).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
     assertThat(throttler.getConcurrentRequests()).isEqualTo(5);
     assertThat(throttler.getQueue()).isEmpty();
   }
@@ -132,7 +133,7 @@ public class ConcurrencyLimitingRequestThrottlerTest {
     throttler.register(incoming);
 
     // Then
-    assertThatStage(incoming.started).isNotDone();
+    assertThatStage(incoming.ended).isNotDone();
     assertThat(throttler.getConcurrentRequests()).isEqualTo(5);
     assertThat(throttler.getQueue()).containsExactly(incoming);
   }
@@ -157,20 +158,20 @@ public class ConcurrencyLimitingRequestThrottlerTest {
     // Given
     MockThrottled first = new MockThrottled();
     throttler.register(first);
-    assertThatStage(first.started).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
+    assertThatStage(first.ended).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
     for (int i = 0; i < 4; i++) {
       throttler.register(new MockThrottled());
     }
 
     MockThrottled incoming = new MockThrottled();
     throttler.register(incoming);
-    assertThatStage(incoming.started).isNotDone();
+    assertThatStage(incoming.ended).isNotDone();
 
     // When
     completeCallback.accept(first);
 
     // Then
-    assertThatStage(incoming.started).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isTrue());
+    assertThatStage(incoming.ended).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isTrue());
     assertThat(throttler.getConcurrentRequests()).isEqualTo(5);
     assertThat(throttler.getQueue()).isEmpty();
   }
@@ -189,7 +190,7 @@ public class ConcurrencyLimitingRequestThrottlerTest {
     throttler.register(incoming);
 
     // Then
-    assertThatStage(incoming.started)
+    assertThatStage(incoming.ended)
         .isFailed(error -> 
assertThat(error).isInstanceOf(RequestThrottlingException.class));
   }
 
@@ -208,7 +209,7 @@ public class ConcurrencyLimitingRequestThrottlerTest {
     throttler.signalTimeout(queued1);
 
     // Then
-    assertThatStage(queued2.started).isNotDone();
+    assertThatStage(queued2.ended).isNotDone();
     assertThat(throttler.getConcurrentRequests()).isEqualTo(5);
     assertThat(throttler.getQueue()).hasSize(1);
   }
@@ -223,7 +224,7 @@ public class ConcurrencyLimitingRequestThrottlerTest {
     for (int i = 0; i < 10; i++) {
       MockThrottled request = new MockThrottled();
       throttler.register(request);
-      assertThatStage(request.started).isNotDone();
+      assertThatStage(request.ended).isNotDone();
       enqueued.add(request);
     }
 
@@ -232,7 +233,7 @@ public class ConcurrencyLimitingRequestThrottlerTest {
 
     // Then
     for (MockThrottled request : enqueued) {
-      assertThatStage(request.started)
+      assertThatStage(request.ended)
           .isFailed(error -> 
assertThat(error).isInstanceOf(RequestThrottlingException.class));
     }
 
@@ -241,7 +242,125 @@ public class ConcurrencyLimitingRequestThrottlerTest {
     throttler.register(request);
 
     // Then
-    assertThatStage(request.started)
+    assertThatStage(request.ended)
         .isFailed(error -> 
assertThat(error).isInstanceOf(RequestThrottlingException.class));
   }
+
+  @Test
+  public void should_run_throttle_callbacks_concurrently() throws 
InterruptedException {
+    // Given
+
+    // a task is enqueued, which when in onThrottleReady, will stall latch 
countDown()ed
+    // register() should automatically start onThrottleReady on same thread
+
+    // start a parallel thread
+    CountDownLatch firstRelease = new CountDownLatch(1);
+    MockThrottled first = new MockThrottled(firstRelease);
+    Runnable r =
+        () -> {
+          throttler.register(first);
+          first.ended.toCompletableFuture().thenRun(() -> 
throttler.signalSuccess(first));
+        };
+    Thread t = new Thread(r);
+    t.start();
+
+    // wait for the registration threads to reach await state
+    assertThatStage(first.started).isSuccess();
+    assertThatStage(first.ended).isNotDone();
+
+    // When
+    // we concurrently submit a second shorter task
+    MockThrottled second = new MockThrottled();
+    // (on a second thread, so that we can join and force a timeout in case
+    // registration is delayed)
+    Thread t2 = new Thread(() -> throttler.register(second));
+    t2.start();
+    t2.join(1_000);
+
+    // Then
+    // registration will trigger callback, should complete ~immediately
+    assertThatStage(second.ended).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
+    // first should still be unfinished
+    assertThatStage(first.started).isDone();
+    assertThatStage(first.ended).isNotDone();
+    // now finish, and verify
+    firstRelease.countDown();
+    assertThatStage(first.ended).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
+
+    t.join(1_000);
+  }
+
+  @Test
+  public void should_enqueue_tasks_quickly_when_callbacks_blocked() throws 
InterruptedException {
+    // Given
+
+    // Multiple tasks are registered, up to the limit, and proceed into their
+    // callback
+
+    // start five parallel threads
+    final int THREADS = 5;
+    Thread[] threads = new Thread[THREADS];
+    CountDownLatch[] latches = new CountDownLatch[THREADS];
+    MockThrottled[] throttled = new MockThrottled[THREADS];
+    for (int i = 0; i < threads.length; i++) {
+      latches[i] = new CountDownLatch(1);
+      final MockThrottled itThrottled = new MockThrottled(latches[i]);
+      throttled[i] = itThrottled;
+      threads[i] =
+          new Thread(
+              () -> {
+                throttler.register(itThrottled);
+                itThrottled
+                    .ended
+                    .toCompletableFuture()
+                    .thenRun(() -> throttler.signalSuccess(itThrottled));
+              });
+      threads[i].start();
+    }
+
+    // wait for the registration threads to be launched
+    // they are all waiting now
+    for (int i = 0; i < throttled.length; i++) {
+      assertThatStage(throttled[i].started).isSuccess();
+      assertThatStage(throttled[i].ended).isNotDone();
+    }
+
+    // When
+    // we concurrently submit another task
+    MockThrottled last = new MockThrottled();
+    throttler.register(last);
+
+    // Then
+    // registration will enqueue the callback, and it should not
+    // take any time to proceed (ie: we should not be blocked)
+    // and there should be an element in the queue
+    assertThatStage(last.started).isNotDone();
+    assertThatStage(last.ended).isNotDone();
+    assertThat(throttler.getQueue()).containsExactly(last);
+
+    // we still have not released, so old throttled threads should be waiting
+    for (int i = 0; i < throttled.length; i++) {
+      assertThatStage(throttled[i].started).isDone();
+      assertThatStage(throttled[i].ended).isNotDone();
+    }
+
+    // now let us release ..
+    for (int i = 0; i < latches.length; i++) {
+      latches[i].countDown();
+    }
+
+    // .. and check everything finished up OK
+    for (int i = 0; i < latches.length; i++) {
+      assertThatStage(throttled[i].started).isSuccess();
+      assertThatStage(throttled[i].ended).isSuccess();
+    }
+
+    // for good measure, we will also wait for the enqueued to complete
+    assertThatStage(last.started).isSuccess();
+    assertThatStage(last.ended).isSuccess();
+
+    for (int i = 0; i < threads.length; i++) {
+      threads[i].join(1_000);
+    }
+  }
 }
diff --git 
a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/MockThrottled.java
 
b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/MockThrottled.java
index b7cd0ee8a..9e54e3d51 100644
--- 
a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/MockThrottled.java
+++ 
b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/MockThrottled.java
@@ -19,21 +19,45 @@ package 
com.datastax.oss.driver.internal.core.session.throttling;
 
 import com.datastax.oss.driver.api.core.RequestThrottlingException;
 import com.datastax.oss.driver.api.core.session.throttling.Throttled;
+import 
com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles;
 import edu.umd.cs.findbugs.annotations.NonNull;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
 
 class MockThrottled implements Throttled {
+  final CompletionStage<Void> started = new CompletableFuture<>();
+  final CompletionStage<Boolean> ended = new CompletableFuture<>();
+  final CountDownLatch canRelease;
 
-  final CompletionStage<Boolean> started = new CompletableFuture<>();
+  public MockThrottled() {
+    this(new CountDownLatch(0));
+  }
+
+  /*
+   * The releaseLatch can be provided to add some delay before the
+   * task readiness/fail callbacks complete. This can be used, eg, to
+   * imitate a slow callback.
+   */
+  public MockThrottled(CountDownLatch releaseLatch) {
+    this.canRelease = releaseLatch;
+  }
 
   @Override
   public void onThrottleReady(boolean wasDelayed) {
-    started.toCompletableFuture().complete(wasDelayed);
+    started.toCompletableFuture().complete(null);
+    awaitRelease();
+    ended.toCompletableFuture().complete(wasDelayed);
   }
 
   @Override
   public void onThrottleFailure(@NonNull RequestThrottlingException error) {
-    started.toCompletableFuture().completeExceptionally(error);
+    started.toCompletableFuture().complete(null);
+    awaitRelease();
+    ended.toCompletableFuture().completeExceptionally(error);
+  }
+
+  private void awaitRelease() {
+    Uninterruptibles.awaitUninterruptibly(canRelease);
   }
 }
diff --git 
a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java
 
b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java
index 0e0fe7c1c..1e15610bf 100644
--- 
a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java
+++ 
b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java
@@ -98,7 +98,7 @@ public class RateLimitingRequestThrottlerTest {
     throttler.register(request);
 
     // Then
-    assertThatStage(request.started).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
+    assertThatStage(request.ended).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
     assertThat(throttler.getStoredPermits()).isEqualTo(4);
     assertThat(throttler.getQueue()).isEmpty();
   }
@@ -117,7 +117,7 @@ public class RateLimitingRequestThrottlerTest {
     throttler.register(request);
 
     // Then
-    assertThatStage(request.started).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
+    assertThatStage(request.ended).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isFalse());
     assertThat(throttler.getStoredPermits()).isEqualTo(0);
     assertThat(throttler.getQueue()).isEmpty();
   }
@@ -136,7 +136,7 @@ public class RateLimitingRequestThrottlerTest {
     throttler.register(request);
 
     // Then
-    assertThatStage(request.started).isNotDone();
+    assertThatStage(request.ended).isNotDone();
     assertThat(throttler.getStoredPermits()).isEqualTo(0);
     assertThat(throttler.getQueue()).containsExactly(request);
 
@@ -160,7 +160,7 @@ public class RateLimitingRequestThrottlerTest {
     throttler.register(request);
 
     // Then
-    assertThatStage(request.started)
+    assertThatStage(request.ended)
         .isFailed(error -> 
assertThat(error).isInstanceOf(RequestThrottlingException.class));
   }
 
@@ -188,7 +188,7 @@ public class RateLimitingRequestThrottlerTest {
     completeCallback.accept(queued1);
 
     // Then
-    assertThatStage(queued2.started).isNotDone();
+    assertThatStage(queued2.ended).isNotDone();
     assertThat(throttler.getStoredPermits()).isEqualTo(0);
     assertThat(throttler.getQueue()).containsExactly(queued2);
   }
@@ -202,10 +202,10 @@ public class RateLimitingRequestThrottlerTest {
 
     MockThrottled queued1 = new MockThrottled();
     throttler.register(queued1);
-    assertThatStage(queued1.started).isNotDone();
+    assertThatStage(queued1.ended).isNotDone();
     MockThrottled queued2 = new MockThrottled();
     throttler.register(queued2);
-    assertThatStage(queued2.started).isNotDone();
+    assertThatStage(queued2.ended).isNotDone();
     assertThat(throttler.getStoredPermits()).isEqualTo(0);
     assertThat(throttler.getQueue()).hasSize(2);
 
@@ -230,8 +230,8 @@ public class RateLimitingRequestThrottlerTest {
     task.run();
 
     // Then
-    assertThatStage(queued1.started).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isTrue());
-    assertThatStage(queued2.started).isNotDone();
+    assertThatStage(queued1.ended).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isTrue());
+    assertThatStage(queued2.ended).isNotDone();
     assertThat(throttler.getStoredPermits()).isEqualTo(0);
     assertThat(throttler.getQueue()).containsExactly(queued2);
     // task reschedules itself since it did not empty the queue
@@ -244,7 +244,7 @@ public class RateLimitingRequestThrottlerTest {
     task.run();
 
     // Then
-    assertThatStage(queued2.started).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isTrue());
+    assertThatStage(queued2.ended).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isTrue());
     assertThat(throttler.getStoredPermits()).isEqualTo(0);
     assertThat(throttler.getQueue()).isEmpty();
     assertThat(adminExecutor.nextTask()).isNull();
@@ -286,14 +286,14 @@ public class RateLimitingRequestThrottlerTest {
     // Then
     MockThrottled queued = new MockThrottled();
     throttler.register(queued);
-    assertThatStage(queued.started).isNotDone();
+    assertThatStage(queued.ended).isNotDone();
 
     // When
     clock.add(ONE_HUNDRED_MILLISECONDS);
     adminExecutor.nextTask().run();
 
     // Then
-    assertThatStage(queued.started).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isTrue());
+    assertThatStage(queued.ended).isSuccess(wasDelayed -> 
assertThat(wasDelayed).isTrue());
   }
 
   @Test
@@ -306,7 +306,7 @@ public class RateLimitingRequestThrottlerTest {
     for (int i = 0; i < 10; i++) {
       MockThrottled request = new MockThrottled();
       throttler.register(request);
-      assertThatStage(request.started).isNotDone();
+      assertThatStage(request.ended).isNotDone();
       enqueued.add(request);
     }
 
@@ -315,7 +315,7 @@ public class RateLimitingRequestThrottlerTest {
 
     // Then
     for (MockThrottled request : enqueued) {
-      assertThatStage(request.started)
+      assertThatStage(request.ended)
           .isFailed(error -> 
assertThat(error).isInstanceOf(RequestThrottlingException.class));
     }
 
@@ -324,7 +324,7 @@ public class RateLimitingRequestThrottlerTest {
     throttler.register(request);
 
     // Then
-    assertThatStage(request.started)
+    assertThatStage(request.ended)
         .isFailed(error -> 
assertThat(error).isInstanceOf(RequestThrottlingException.class));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to