This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 55857fc4ed7 Camel 20158 fix throttle tests (#12233)
55857fc4ed7 is described below

commit 55857fc4ed7191b8ba85e09967593b7157931399
Author: Jono Morris <[email protected]>
AuthorDate: Wed Nov 29 02:51:55 2023 +1300

    Camel 20158 fix throttle tests (#12233)
    
    * CAMEL-20158 use semaphore to count requests
    
    * CAMEL-20158 reenable throttle tests
---
 .../java/org/apache/camel/processor/Throttler.java |  2 +-
 .../org/apache/camel/processor/ThrottlerTest.java  | 48 +++++++++-------------
 2 files changed, 21 insertions(+), 29 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
index b74b145914e..a521808b218 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
@@ -424,7 +424,7 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
     }
 
     /**
-     * Sets the maximum number of requests per time period expression
+     * Sets the maximum number of concurrent requests.
      */
     public void setMaximumConcurrentRequestsExpression(Expression 
maxConcurrentRequestsExpression) {
         this.maxConcurrentRequestsExpression = maxConcurrentRequestsExpression;
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
index fc2b64f62f7..bfe2b1c5a14 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
@@ -19,7 +19,7 @@ package org.apache.camel.processor;
 import java.util.Arrays;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Semaphore;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
@@ -39,8 +39,7 @@ public class ThrottlerTest extends ContextTestSupport {
     private static final int INTERVAL = 500;
     private static final int MESSAGE_COUNT = 9;
     private static final int CONCURRENT_REQUESTS = 2;
-    private volatile int curr;
-    private volatile int max;
+    private Semaphore semaphore;
 
     @Test
     public void testSendLotsOfMessagesWithRejectExecution() throws Exception {
@@ -61,22 +60,20 @@ public class ThrottlerTest extends ContextTestSupport {
         }
     }
 
-    @Disabled("Disabled due to CAMEL-20158")
     @Test
     public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() 
throws Exception {
+        semaphore = new Semaphore(CONCURRENT_REQUESTS);
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", 
MockEndpoint.class);
         sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:a", MESSAGE_COUNT, 
resultEndpoint);
-        assertTrue(max <= CONCURRENT_REQUESTS);
     }
 
     @Test
     public void testConfigurationWithConstantExpression() throws Exception {
+        semaphore = new Semaphore(CONCURRENT_REQUESTS);
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", 
MockEndpoint.class);
         sendMessagesAndAwaitDelivery(MESSAGE_COUNT, 
"direct:expressionConstant", MESSAGE_COUNT, resultEndpoint);
-        assertTrue(max <= CONCURRENT_REQUESTS);
     }
 
-    @Disabled("Disabled due to CAMEL-20158")
     @Test
     public void testConfigurationWithHeaderExpression() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", 
MockEndpoint.class);
@@ -84,37 +81,36 @@ public class ThrottlerTest extends ContextTestSupport {
 
         ExecutorService executor = Executors.newFixedThreadPool(MESSAGE_COUNT);
         try {
-            sendMessagesWithHeaderExpression(executor, resultEndpoint, 
CONCURRENT_REQUESTS, INTERVAL, MESSAGE_COUNT);
+            sendMessagesWithHeaderExpression(executor, resultEndpoint, 
CONCURRENT_REQUESTS, MESSAGE_COUNT);
         } finally {
             executor.shutdownNow();
         }
     }
 
-    @Disabled("Disabled due to CAMEL-20158")
     @Test
     public void testConfigurationWithChangingHeaderExpression() throws 
Exception {
         ExecutorService executor = Executors.newFixedThreadPool(5);
         try {
             MockEndpoint resultEndpoint = 
resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-            sendMessagesWithHeaderExpression(executor, resultEndpoint, 2, 
INTERVAL, MESSAGE_COUNT);
+            sendMessagesWithHeaderExpression(executor, resultEndpoint, 2, 
MESSAGE_COUNT);
             Thread.sleep(INTERVAL); // sleep here to ensure the
                                    // first throttle rate does not
                                    // influence the next one.
 
             resultEndpoint.reset();
-            sendMessagesWithHeaderExpression(executor, resultEndpoint, 4, 
INTERVAL, MESSAGE_COUNT);
+            sendMessagesWithHeaderExpression(executor, resultEndpoint, 4, 
MESSAGE_COUNT);
             Thread.sleep(INTERVAL); // sleep here to ensure the
                                    // first throttle rate does not
                                    // influence the next one.
 
             resultEndpoint.reset();
-            sendMessagesWithHeaderExpression(executor, resultEndpoint, 2, 
INTERVAL, MESSAGE_COUNT);
+            sendMessagesWithHeaderExpression(executor, resultEndpoint, 2, 
MESSAGE_COUNT);
             Thread.sleep(INTERVAL); // sleep here to ensure the
                                    // first throttle rate does not
                                    // influence the next one.
 
             resultEndpoint.reset();
-            sendMessagesWithHeaderExpression(executor, resultEndpoint, 4, 
INTERVAL, MESSAGE_COUNT);
+            sendMessagesWithHeaderExpression(executor, resultEndpoint, 4, 
MESSAGE_COUNT);
         } finally {
             executor.shutdownNow();
         }
@@ -162,13 +158,11 @@ public class ThrottlerTest extends ContextTestSupport {
     }
 
     private void sendMessagesWithHeaderExpression(
-            final ExecutorService executor, final MockEndpoint resultEndpoint, 
final int throttle, final int intervalMs,
-            final int messageCount)
+            final ExecutorService executor, final MockEndpoint resultEndpoint, 
final int throttle, final int messageCount)
             throws InterruptedException {
         resultEndpoint.expectedMessageCount(messageCount);
+        semaphore = new Semaphore(throttle);
 
-        max = 0;
-        long start = System.nanoTime();
         for (int i = 0; i < messageCount; i++) {
             executor.execute(new Runnable() {
                 public void run() {
@@ -180,8 +174,6 @@ public class ThrottlerTest extends ContextTestSupport {
 
         // let's wait for the exchanges to arrive
         resultEndpoint.assertIsSatisfied();
-        long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
-        assertTrue(max <= throttle);
     }
 
     private void sendBody(String endpoint) {
@@ -198,31 +190,31 @@ public class ThrottlerTest extends ContextTestSupport {
 
                 from("direct:a").throttle(CONCURRENT_REQUESTS)
                         .process(exchange -> {
-                            curr++;
+                            assertTrue(semaphore.tryAcquire(), "'direct:a' too 
many requests");
                         })
-                        .delay(INTERVAL)
+                        .delay(100)
                         .process(exchange -> {
-                            max = Math.max(max, curr--);
+                            semaphore.release();
                         })
                         .to("log:result", "mock:result");
 
                 
from("direct:expressionConstant").throttle(constant(CONCURRENT_REQUESTS))
                         .process(exchange -> {
-                            curr++;
+                            assertTrue(semaphore.tryAcquire(), 
"'direct:expressionConstant' too many requests");
                         })
-                        .delay(INTERVAL)
+                        .delay(100)
                         .process(exchange -> {
-                            max = Math.max(max, curr--);
+                            semaphore.release();
                         })
                         .to("log:result", "mock:result");
 
                 
from("direct:expressionHeader").throttle(header("throttleValue"))
                         .process(exchange -> {
-                            curr++;
+                            assertTrue(semaphore.tryAcquire(), 
"'direct:expressionHeader' too many requests");
                         })
-                        .delay(INTERVAL)
+                        .delay(100)
                         .process(exchange -> {
-                            max = Math.max(max, curr--);
+                            semaphore.release();
                         })
                         .to("log:result", "mock:result");
 

Reply via email to