This is an automated email from the ASF dual-hosted git repository.

tolbertam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/cassandra-java-driver.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 77805f510 JAVA-3149: Support request cancellation in request throttler 
patch by Lukasz Antoniak; reviewed by Andy Tolbert and Chris Lohfink for 
JAVA-3149
77805f510 is described below

commit 77805f5103354cadb360384f4f41e0eca73d72f4
Author: Lukasz Antoniak <lukasz.anton...@gmail.com>
AuthorDate: Mon Sep 2 06:44:53 2024 +0200

    JAVA-3149: Support request cancellation in request throttler
    patch by Lukasz Antoniak; reviewed by Andy Tolbert and Chris Lohfink for 
JAVA-3149
---
 .../continuous/ContinuousRequestHandlerBase.java   |  1 +
 .../internal/core/graph/GraphRequestHandler.java   |  1 +
 .../core/session/throttling/RequestThrottler.java  |  8 +++++
 .../internal/core/cql/CqlPrepareHandler.java       |  1 +
 .../internal/core/cql/CqlRequestHandler.java       |  1 +
 .../ConcurrencyLimitingRequestThrottler.java       | 16 ++++++++++
 .../throttling/PassThroughRequestThrottler.java    |  5 ++++
 .../throttling/RateLimitingRequestThrottler.java   | 12 ++++++++
 .../ConcurrencyLimitingRequestThrottlerTest.java   |  5 ++++
 .../RateLimitingRequestThrottlerTest.java          | 13 ++++++++-
 .../oss/driver/core/throttling/ThrottlingIT.java   | 34 +++++++++++++++++-----
 11 files changed, 89 insertions(+), 8 deletions(-)

diff --git 
a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java
 
b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java
index 9a7be3447..0453022cb 100644
--- 
a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java
+++ 
b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java
@@ -410,6 +410,7 @@ public abstract class 
ContinuousRequestHandlerBase<StatementT extends Request, R
 
     cancelScheduledTasks(null);
     cancelGlobalTimeout();
+    throttler.signalCancel(this);
   }
 
   private void cancelGlobalTimeout() {
diff --git 
a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java
 
b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java
index 702da69b8..5c9ceb00d 100644
--- 
a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java
+++ 
b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java
@@ -153,6 +153,7 @@ public class GraphRequestHandler implements Throttled {
           try {
             if (t instanceof CancellationException) {
               cancelScheduledTasks();
+              context.getRequestThrottler().signalCancel(this);
             }
           } catch (Throwable t2) {
             Loggers.warnWithException(LOG, "[{}] Uncaught exception", 
logPrefix, t2);
diff --git 
a/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java
 
b/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java
index cb55fac33..7e2b41ebb 100644
--- 
a/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java
+++ 
b/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java
@@ -56,4 +56,12 @@ public interface RequestThrottler extends Closeable {
    * perform time-based eviction on pending requests.
    */
   void signalTimeout(@NonNull Throttled request);
+
+  /**
+   * Signals that a request has been cancelled. This indicates to the 
throttler that another request
+   * might be started.
+   */
+  default void signalCancel(@NonNull Throttled request) {
+    // no-op for backward compatibility purposes
+  }
 }
diff --git 
a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java
 
b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java
index 8fe1adb20..1ee1f303a 100644
--- 
a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java
+++ 
b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java
@@ -124,6 +124,7 @@ public class CqlPrepareHandler implements Throttled {
           try {
             if (t instanceof CancellationException) {
               cancelTimeout();
+              context.getRequestThrottler().signalCancel(this);
             }
           } catch (Throwable t2) {
             Loggers.warnWithException(LOG, "[{}] Uncaught exception", 
logPrefix, t2);
diff --git 
a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java
 
b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java
index a1c6b0e54..0808bdce6 100644
--- 
a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java
+++ 
b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java
@@ -152,6 +152,7 @@ public class CqlRequestHandler implements Throttled {
           try {
             if (t instanceof CancellationException) {
               cancelScheduledTasks();
+              context.getRequestThrottler().signalCancel(this);
             }
           } catch (Throwable t2) {
             Loggers.warnWithException(LOG, "[{}] Uncaught exception", 
logPrefix, t2);
diff --git 
a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
 
b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
index e8f27467c..438bed095 100644
--- 
a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
+++ 
b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
@@ -145,6 +145,22 @@ public class ConcurrencyLimitingRequestThrottler 
implements RequestThrottler {
     }
   }
 
+  @Override
+  public void signalCancel(@NonNull Throttled request) {
+    lock.lock();
+    try {
+      if (!closed) {
+        if (queue.remove(request)) { // The request has been cancelled before 
it was active
+          LOG.trace("[{}] Removing cancelled request from the queue", 
logPrefix);
+        } else {
+          onRequestDone();
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
   @SuppressWarnings("GuardedBy") // this method is only called with the lock 
held
   private void onRequestDone() {
     assert lock.isHeldByCurrentThread();
diff --git 
a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/PassThroughRequestThrottler.java
 
b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/PassThroughRequestThrottler.java
index 714c712a4..2210e4b26 100644
--- 
a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/PassThroughRequestThrottler.java
+++ 
b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/PassThroughRequestThrottler.java
@@ -69,6 +69,11 @@ public class PassThroughRequestThrottler implements 
RequestThrottler {
     // nothing to do
   }
 
+  @Override
+  public void signalCancel(@NonNull Throttled request) {
+    // nothing to do
+  }
+
   @Override
   public void close() throws IOException {
     // nothing to do
diff --git 
a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottler.java
 
b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottler.java
index 6536804ff..03a693dc0 100644
--- 
a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottler.java
+++ 
b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottler.java
@@ -198,6 +198,18 @@ public class RateLimitingRequestThrottler implements 
RequestThrottler {
     }
   }
 
+  @Override
+  public void signalCancel(@NonNull Throttled request) {
+    lock.lock();
+    try {
+      if (!closed && queue.remove(request)) { // The request has been 
cancelled before it was active
+        LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
   @Override
   public void close() {
     lock.lock();
diff --git 
a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java
 
b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java
index b587ac3da..c01b26c1e 100644
--- 
a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java
+++ 
b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java
@@ -88,6 +88,11 @@ public class ConcurrencyLimitingRequestThrottlerTest {
     
should_allow_new_request_when_active_one_completes(throttler::signalTimeout);
   }
 
+  @Test
+  public void should_allow_new_request_when_active_one_canceled() {
+    
should_allow_new_request_when_active_one_completes(throttler::signalCancel);
+  }
+
   private void should_allow_new_request_when_active_one_completes(
       Consumer<Throttled> completeCallback) {
     // Given
diff --git 
a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java
 
b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java
index 7336fb447..0e0fe7c1c 100644
--- 
a/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java
+++ 
b/core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java
@@ -25,6 +25,7 @@ import 
com.datastax.oss.driver.api.core.RequestThrottlingException;
 import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
 import com.datastax.oss.driver.api.core.config.DriverConfig;
 import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
+import com.datastax.oss.driver.api.core.session.throttling.Throttled;
 import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
 import com.datastax.oss.driver.internal.core.context.NettyOptions;
 import 
com.datastax.oss.driver.internal.core.util.concurrent.ScheduledTaskCapturingEventLoop;
@@ -33,6 +34,7 @@ import io.netty.channel.EventLoopGroup;
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -164,6 +166,15 @@ public class RateLimitingRequestThrottlerTest {
 
   @Test
   public void should_remove_timed_out_request_from_queue() {
+    testRemoveInvalidEventFromQueue(throttler::signalTimeout);
+  }
+
+  @Test
+  public void should_remove_cancel_request_from_queue() {
+    testRemoveInvalidEventFromQueue(throttler::signalCancel);
+  }
+
+  private void testRemoveInvalidEventFromQueue(Consumer<Throttled> 
completeCallback) {
     // Given
     for (int i = 0; i < 5; i++) {
       throttler.register(new MockThrottled());
@@ -174,7 +185,7 @@ public class RateLimitingRequestThrottlerTest {
     throttler.register(queued2);
 
     // When
-    throttler.signalTimeout(queued1);
+    completeCallback.accept(queued1);
 
     // Then
     assertThatStage(queued2.started).isNotDone();
diff --git 
a/integration-tests/src/test/java/com/datastax/oss/driver/core/throttling/ThrottlingIT.java
 
b/integration-tests/src/test/java/com/datastax/oss/driver/core/throttling/ThrottlingIT.java
index a6e7295eb..6fa1a3735 100644
--- 
a/integration-tests/src/test/java/com/datastax/oss/driver/core/throttling/ThrottlingIT.java
+++ 
b/integration-tests/src/test/java/com/datastax/oss/driver/core/throttling/ThrottlingIT.java
@@ -24,13 +24,16 @@ import com.datastax.oss.driver.api.core.CqlSession;
 import com.datastax.oss.driver.api.core.RequestThrottlingException;
 import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
 import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
 import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
 import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule;
 import com.datastax.oss.driver.categories.ParallelizableTests;
 import 
com.datastax.oss.driver.internal.core.session.throttling.ConcurrencyLimitingRequestThrottler;
 import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
 import com.datastax.oss.simulacron.common.stubbing.PrimeDsl;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -39,21 +42,20 @@ import org.junit.experimental.categories.Category;
 public class ThrottlingIT {
 
   private static final String QUERY = "select * from foo";
+  private static final int maxConcurrentRequests = 10;
+  private static final int maxQueueSize = 10;
 
   @Rule public SimulacronRule simulacron = new 
SimulacronRule(ClusterSpec.builder().withNodes(1));
 
-  @Test
-  public void should_reject_request_when_throttling_by_concurrency() {
+  private DriverConfigLoader loader = null;
 
+  @Before
+  public void setUp() {
     // Add a delay so that requests don't complete during the test
     simulacron
         .cluster()
         .prime(PrimeDsl.when(QUERY).then(PrimeDsl.noRows()).delay(5, 
TimeUnit.SECONDS));
-
-    int maxConcurrentRequests = 10;
-    int maxQueueSize = 10;
-
-    DriverConfigLoader loader =
+    loader =
         SessionUtils.configLoaderBuilder()
             .withClass(
                 DefaultDriverOption.REQUEST_THROTTLER_CLASS,
@@ -63,7 +65,10 @@ public class ThrottlingIT {
                 maxConcurrentRequests)
             .withInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE, 
maxQueueSize)
             .build();
+  }
 
+  @Test
+  public void should_reject_request_when_throttling_by_concurrency() {
     try (CqlSession session = SessionUtils.newSession(simulacron, loader)) {
 
       // Saturate the session and fill the queue
@@ -81,4 +86,19 @@ public class ThrottlingIT {
                   + "(concurrent requests: 10, queue size: 10)");
     }
   }
+
+  @Test
+  public void should_propagate_cancel_to_throttler() {
+    try (CqlSession session = SessionUtils.newSession(simulacron, loader)) {
+
+      // Try to saturate the session and fill the queue
+      for (int i = 0; i < maxConcurrentRequests + maxQueueSize; i++) {
+        CompletionStage<AsyncResultSet> future = session.executeAsync(QUERY);
+        future.toCompletableFuture().cancel(true);
+      }
+
+      // The next query should be successful, because the previous queries 
were cancelled
+      session.execute(QUERY);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to