This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 89bf464001137400bdfc810f0a7c7bde04371a35
Author: Guillaume Nodet <[email protected]>
AuthorDate: Fri Oct 5 10:01:41 2018 +0200

    Introduce interfaces for ConsumerCache and ProducerCache
---
 .../main/java/org/apache/camel/DynamicRouter.java  |   2 +-
 .../main/java/org/apache/camel/RoutingSlip.java    |   2 +-
 .../java/org/apache/camel/spi/ConsumerCache.java   |  55 ++++++++
 .../java/org/apache/camel/spi/ProducerCache.java   |  87 ++++++++++++
 .../src/main/docs/eips/dynamicRouter-eip.adoc      |   2 +-
 camel-core/src/main/docs/eips/enrich-eip.adoc      |   2 +-
 camel-core/src/main/docs/eips/pollEnrich-eip.adoc  |   2 +-
 .../src/main/docs/eips/recipientList-eip.adoc      |   2 +-
 camel-core/src/main/docs/eips/routingSlip-eip.adoc |   2 +-
 camel-core/src/main/docs/eips/toD-eip.adoc         |   2 +-
 camel-core/src/main/docs/eips/wireTap-eip.adoc     |   2 +-
 ...onsumerCache.java => DefaultConsumerCache.java} |   5 +-
 .../apache/camel/impl/DefaultConsumerTemplate.java |   5 +-
 ...roducerCache.java => DefaultProducerCache.java} |  24 +---
 .../apache/camel/impl/DefaultProducerTemplate.java |   5 +-
 .../DefaultManagementLifecycleStrategy.java        |   4 +-
 .../management/mbean/ManagedConsumerCache.java     |   2 +-
 .../management/mbean/ManagedProducerCache.java     |   2 +-
 .../camel/model/DynamicRouterDefinition.java       |   3 +-
 .../org/apache/camel/model/EnrichDefinition.java   |   3 +-
 .../apache/camel/model/PollEnrichDefinition.java   |   3 +-
 .../apache/camel/model/ProcessorDefinition.java    |   3 +-
 .../camel/model/RecipientListDefinition.java       |   2 +-
 .../apache/camel/model/RoutingSlipDefinition.java  |   2 +-
 .../apache/camel/model/ToDynamicDefinition.java    |   2 +-
 .../org/apache/camel/model/WireTapDefinition.java  |   2 +-
 .../java/org/apache/camel/processor/Enricher.java  |   5 +-
 .../org/apache/camel/processor/PollEnricher.java   |   5 +-
 .../org/apache/camel/processor/RecipientList.java  |   5 +-
 .../camel/processor/RecipientListProcessor.java    |   5 +-
 .../org/apache/camel/processor/RoutingSlip.java    | 148 ++++++++++-----------
 .../camel/processor/SendDynamicProcessor.java      |   5 +-
 .../org/apache/camel/processor/SendProcessor.java  |   5 +-
 .../camel/impl/ConsumerCacheZeroCapacityTest.java  |   2 +-
 .../camel/impl/DefaultConsumerCacheTest.java       |   2 +-
 .../camel/impl/DefaultProducerCacheTest.java       |   6 +-
 .../apache/camel/impl/EmptyProducerCacheTest.java  |   4 +-
 .../camel/impl/ProducerCacheNonSingletonTest.java  |   2 +-
 .../management/ManagedConsumerCacheHitsTest.java   |   4 +-
 .../camel/management/ManagedConsumerCacheTest.java |   4 +-
 .../client/GrpcResponseRouterStreamObserver.java   |   5 +-
 41 files changed, 284 insertions(+), 150 deletions(-)

diff --git a/camel-api/src/main/java/org/apache/camel/DynamicRouter.java 
b/camel-api/src/main/java/org/apache/camel/DynamicRouter.java
index 1e94c4f..278d296 100644
--- a/camel-api/src/main/java/org/apache/camel/DynamicRouter.java
+++ b/camel-api/src/main/java/org/apache/camel/DynamicRouter.java
@@ -60,7 +60,7 @@ public @interface DynamicRouter {
     boolean ignoreInvalidEndpoints() default false;
 
     /**
-     * Sets the maximum size used by the {@link 
org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link 
org.apache.camel.spi.ProducerCache} which is used
      * to cache and reuse producers when using this dynamic router, when uris 
are reused.
      */
     int cacheSize() default 0;
diff --git a/camel-api/src/main/java/org/apache/camel/RoutingSlip.java 
b/camel-api/src/main/java/org/apache/camel/RoutingSlip.java
index 2c1dca4..bb996aa 100644
--- a/camel-api/src/main/java/org/apache/camel/RoutingSlip.java
+++ b/camel-api/src/main/java/org/apache/camel/RoutingSlip.java
@@ -60,7 +60,7 @@ public @interface RoutingSlip {
     boolean ignoreInvalidEndpoints() default false;
 
     /**
-     * Sets the maximum size used by the {@link 
org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link 
org.apache.camel.spi.ProducerCache} which is used
      * to cache and reuse producers when using this routing slip, when uris 
are reused.
      */
     int cacheSize() default 0;
diff --git a/camel-api/src/main/java/org/apache/camel/spi/ConsumerCache.java 
b/camel-api/src/main/java/org/apache/camel/spi/ConsumerCache.java
new file mode 100644
index 0000000..67c505a
--- /dev/null
+++ b/camel-api/src/main/java/org/apache/camel/spi/ConsumerCache.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.Service;
+
+public interface ConsumerCache extends Service {
+
+    PollingConsumer acquirePollingConsumer(Endpoint endpoint);
+
+    void releasePollingConsumer(Endpoint endpoint, PollingConsumer 
pollingConsumer);
+
+    Exchange receive(Endpoint endpoint);
+
+    Exchange receive(Endpoint endpoint, long timeout);
+
+    Exchange receiveNoWait(Endpoint endpoint);
+
+    Object getSource();
+
+    int size();
+
+    int getCapacity();
+
+    long getHits();
+
+    long getMisses();
+
+    long getEvicted();
+
+    void resetCacheStatistics();
+
+    void purge();
+
+    void cleanUp();
+
+    EndpointUtilizationStatistics getEndpointUtilizationStatistics();
+}
diff --git a/camel-api/src/main/java/org/apache/camel/spi/ProducerCache.java 
b/camel-api/src/main/java/org/apache/camel/spi/ProducerCache.java
new file mode 100644
index 0000000..51f7c8e
--- /dev/null
+++ b/camel-api/src/main/java/org/apache/camel/spi/ProducerCache.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncProducer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Service;
+
+public interface ProducerCache extends Service {
+
+    AsyncProducer acquireProducer(Endpoint endpoint);
+
+    void releaseProducer(Endpoint endpoint, AsyncProducer producer);
+
+    Exchange send(Endpoint endpoint, Exchange exchange, Processor 
resultProcessor);
+
+    CompletableFuture<Exchange> asyncSendExchange(Endpoint endpoint, 
ExchangePattern pattern, Processor processor, Processor resultProcessor, 
Exchange inExchange, CompletableFuture<Exchange> exchangeFuture);
+
+    Object getSource();
+
+    int size();
+
+    int getCapacity();
+
+    long getHits();
+
+    long getMisses();
+
+    long getEvicted();
+
+    void resetCacheStatistics();
+
+    void purge();
+
+    void cleanUp();
+
+    boolean isEventNotifierEnabled();
+
+    void setEventNotifierEnabled(boolean eventNotifierEnabled);
+
+    EndpointUtilizationStatistics getEndpointUtilizationStatistics();
+
+    boolean doInAsyncProducer(Endpoint endpoint, Exchange exchange, 
AsyncCallback callback, AsyncProducerCallback asyncProducerCallback);
+
+    /**
+     * Callback for sending a exchange message to a endpoint using an {@link 
AsyncProcessor} capable producer.
+     * <p/>
+     * Using this callback as a template pattern ensures that Camel handles 
the resource handling and will
+     * start and stop the given producer, to avoid resource leaks.
+     *
+     */
+    interface AsyncProducerCallback {
+
+        /**
+         * Performs operation on the given producer to send the given exchange.
+         *
+         * @param asyncProducer   the async producer, is never <tt>null</tt>
+         * @param exchange        the exchange to process
+         * @param callback        the async callback
+         * @return (doneSync) <tt>true</tt> to continue execute synchronously, 
<tt>false</tt> to continue being executed asynchronously
+         */
+        boolean doInAsyncProducer(AsyncProducer asyncProducer, Exchange 
exchange, AsyncCallback callback);
+    }
+
+}
diff --git a/camel-core/src/main/docs/eips/dynamicRouter-eip.adoc 
b/camel-core/src/main/docs/eips/dynamicRouter-eip.adoc
index f2e98bb..80ed1d3 100644
--- a/camel-core/src/main/docs/eips/dynamicRouter-eip.adoc
+++ b/camel-core/src/main/docs/eips/dynamicRouter-eip.adoc
@@ -23,7 +23,7 @@ The Dynamic Router EIP supports 3 options which are listed 
below:
 | Name | Description | Default | Type
 | *uriDelimiter* | Sets the uri delimiter to use | , | String
 | *ignoreInvalidEndpoints* | Ignore the invalidate endpoint exception when try 
to create a producer with that endpoint | false | Boolean
-| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.impl.ProducerCache which is used to cache and reuse producers 
when using this dynamic router, when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.spi.ProducerCache which is used to cache and reuse producers 
when using this dynamic router, when uris are reused. |  | Integer
 |===
 // eip options: END
 
diff --git a/camel-core/src/main/docs/eips/enrich-eip.adoc 
b/camel-core/src/main/docs/eips/enrich-eip.adoc
index 295132f..e2d284b 100644
--- a/camel-core/src/main/docs/eips/enrich-eip.adoc
+++ b/camel-core/src/main/docs/eips/enrich-eip.adoc
@@ -16,7 +16,7 @@ The Enrich EIP supports 7 options which are listed below:
 | *strategyMethodAllowNull* | If this option is false then the aggregate 
method is not used if there was no data to enrich. If this option is true then 
null values is used as the oldExchange (when no data to enrich), when using 
POJOs as the AggregationStrategy. | false | Boolean
 | *aggregateOnException* | If this option is false then the aggregate method 
is not used if there was an exception thrown while trying to retrieve the data 
to enrich from the resource. Setting this option to true allows end users to 
control what to do if there was an exception in the aggregate method. For 
example to suppress the exception or set a custom message body etc. | false | 
Boolean
 | *shareUnitOfWork* | Shares the org.apache.camel.spi.UnitOfWork with the 
parent and the resource exchange. Enrich will by default not share unit of work 
between the parent exchange and the resource exchange. This means the resource 
exchange has its own individual unit of work. | false | Boolean
-| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.impl.ProducerCache which is used to cache and reuse producer 
when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.spi.ProducerCache which is used to cache and reuse producer 
when uris are reused. |  | Integer
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try 
to create a producer with that endpoint | false | Boolean
 |===
 // eip options: END
diff --git a/camel-core/src/main/docs/eips/pollEnrich-eip.adoc 
b/camel-core/src/main/docs/eips/pollEnrich-eip.adoc
index 2d699b4..9207655 100644
--- a/camel-core/src/main/docs/eips/pollEnrich-eip.adoc
+++ b/camel-core/src/main/docs/eips/pollEnrich-eip.adoc
@@ -49,7 +49,7 @@ The Poll Enrich EIP supports 7 options which are listed below:
 | *strategyMethodName* | This option can be used to explicit declare the 
method name to use, when using POJOs as the AggregationStrategy. |  | String
 | *strategyMethodAllowNull* | If this option is false then the aggregate 
method is not used if there was no data to enrich. If this option is true then 
null values is used as the oldExchange (when no data to enrich), when using 
POJOs as the AggregationStrategy. | false | Boolean
 | *aggregateOnException* | If this option is false then the aggregate method 
is not used if there was an exception thrown while trying to retrieve the data 
to enrich from the resource. Setting this option to true allows end users to 
control what to do if there was an exception in the aggregate method. For 
example to suppress the exception or set a custom message body etc. | false | 
Boolean
-| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.impl.ConsumerCache which is used to cache and reuse consumers 
when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.spi.ConsumerCache which is used to cache and reuse consumers 
when uris are reused. |  | Integer
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try 
to create a producer with that endpoint | false | Boolean
 |===
 // eip options: END
diff --git a/camel-core/src/main/docs/eips/recipientList-eip.adoc 
b/camel-core/src/main/docs/eips/recipientList-eip.adoc
index c4377d4..7245644 100644
--- a/camel-core/src/main/docs/eips/recipientList-eip.adoc
+++ b/camel-core/src/main/docs/eips/recipientList-eip.adoc
@@ -26,7 +26,7 @@ The Recipient List EIP supports 15 options which are listed 
below:
 | *timeout* | Sets a total timeout specified in millis, when using parallel 
processing. If the Recipient List hasn't been able to send and process all 
replies within the given timeframe, then the timeout triggers and the Recipient 
List breaks out and continues. Notice if you provide a 
TimeoutAwareAggregationStrategy then the timeout method is invoked before 
breaking out. If the timeout is reached with running tasks still remaining, 
certain tasks for which it is difficult for Camel to shu [...]
 | *onPrepareRef* | Uses the Processor when preparing the 
org.apache.camel.Exchange to be send. This can be used to deep-clone messages 
that should be send, or any custom logic needed before the exchange is send. |  
| String
 | *shareUnitOfWork* | Shares the org.apache.camel.spi.UnitOfWork with the 
parent and each of the sub messages. Recipient List will by default not share 
unit of work between the parent exchange and each recipient exchange. This 
means each sub exchange has its own individual unit of work. | false | Boolean
-| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.impl.ProducerCache which is used to cache and reuse producers 
when using this recipient list, when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.spi.ProducerCache which is used to cache and reuse producers 
when using this recipient list, when uris are reused. |  | Integer
 | *parallelAggregate* | If enabled then the aggregate method on 
AggregationStrategy can be called concurrently. Notice that this would require 
the implementation of AggregationStrategy to be implemented as thread-safe. By 
default this is false meaning that Camel synchronizes the call to the aggregate 
method. Though in some use-cases this can be used to archive higher performance 
when the AggregationStrategy is implemented as thread-safe. | false | Boolean
 | *stopOnAggregateException* | If enabled, unwind exceptions occurring at 
aggregation time to the error handler when parallelProcessing is used. 
Currently, aggregation time exceptions do not stop the route processing when 
parallelProcessing is used. Enabling this option allows to work around this 
behavior. The default value is false for the sake of backward compatibility. | 
false | Boolean
 |===
diff --git a/camel-core/src/main/docs/eips/routingSlip-eip.adoc 
b/camel-core/src/main/docs/eips/routingSlip-eip.adoc
index 38e37c1..4de7c26 100644
--- a/camel-core/src/main/docs/eips/routingSlip-eip.adoc
+++ b/camel-core/src/main/docs/eips/routingSlip-eip.adoc
@@ -15,7 +15,7 @@ The Routing Slip EIP supports 3 options which are listed 
below:
 | Name | Description | Default | Type
 | *uriDelimiter* | Sets the uri delimiter to use | , | String
 | *ignoreInvalidEndpoints* | Ignore the invalidate endpoint exception when try 
to create a producer with that endpoint | false | Boolean
-| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.impl.ProducerCache which is used to cache and reuse producers 
when using this routing slip, when uris are reused. |  | Integer
+| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.spi.ProducerCache which is used to cache and reuse producers 
when using this routing slip, when uris are reused. |  | Integer
 |===
 // eip options: END
 
diff --git a/camel-core/src/main/docs/eips/toD-eip.adoc 
b/camel-core/src/main/docs/eips/toD-eip.adoc
index f788864..82a7a5d 100644
--- a/camel-core/src/main/docs/eips/toD-eip.adoc
+++ b/camel-core/src/main/docs/eips/toD-eip.adoc
@@ -17,7 +17,7 @@ The To D EIP supports 5 options which are listed below:
 | Name | Description | Default | Type
 | *uri* | *Required* The uri of the endpoint to send to. The uri can be 
dynamic computed using the org.apache.camel.language.simple.SimpleLanguage 
expression. |  | String
 | *pattern* | Sets the optional ExchangePattern used to invoke this endpoint | 
 | ExchangePattern
-| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.impl.ConsumerCache which is used to cache and reuse producers. 
|  | Integer
+| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.spi.ConsumerCache which is used to cache and reuse producers. 
|  | Integer
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try 
to create a producer with that endpoint | false | Boolean
 | *allowOptimisedComponents* | Whether to allow components to optimise toD if 
they are org.apache.camel.spi.SendDynamicAware. | true | Boolean
 |===
diff --git a/camel-core/src/main/docs/eips/wireTap-eip.adoc 
b/camel-core/src/main/docs/eips/wireTap-eip.adoc
index b9f3a6f..f8473d4 100644
--- a/camel-core/src/main/docs/eips/wireTap-eip.adoc
+++ b/camel-core/src/main/docs/eips/wireTap-eip.adoc
@@ -31,7 +31,7 @@ The Wire Tap EIP supports 11 options which are listed below:
 | *onPrepareRef* | Uses the Processor when preparing the 
org.apache.camel.Exchange to be send. This can be used to deep-clone messages 
that should be send, or any custom logic needed before the exchange is send. |  
| String
 | *uri* | *Required* The uri of the endpoint to send to. The uri can be 
dynamic computed using the org.apache.camel.language.simple.SimpleLanguage 
expression. |  | String
 | *pattern* | Sets the optional ExchangePattern used to invoke this endpoint | 
 | ExchangePattern
-| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.impl.ConsumerCache which is used to cache and reuse producers. 
|  | Integer
+| *cacheSize* | Sets the maximum size used by the 
org.apache.camel.spi.ConsumerCache which is used to cache and reuse producers. 
|  | Integer
 | *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try 
to create a producer with that endpoint | false | Boolean
 | *allowOptimisedComponents* | Whether to allow components to optimise toD if 
they are org.apache.camel.spi.SendDynamicAware. | true | Boolean
 |===
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerCache.java
similarity index 97%
rename from camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
rename to 
camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerCache.java
index f25f002..6eed897 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerCache.java
@@ -21,6 +21,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.FailedToCreateConsumerException;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.ServiceHelper;
@@ -29,7 +30,7 @@ import org.apache.camel.support.ServiceSupport;
 /**
  * Cache containing created {@link org.apache.camel.Consumer}.
  */
-public class ConsumerCache extends ServiceSupport {
+public class DefaultConsumerCache extends ServiceSupport implements 
ConsumerCache {
 
     private final CamelContext camelContext;
     private final ServicePool<PollingConsumer> consumers;
@@ -39,7 +40,7 @@ public class ConsumerCache extends ServiceSupport {
     private boolean extendedStatistics;
     private int maxCacheSize;
 
-    public ConsumerCache(Object source, CamelContext camelContext, int 
cacheSize) {
+    public DefaultConsumerCache(Object source, CamelContext camelContext, int 
cacheSize) {
         this.source = source;
         this.camelContext = camelContext;
         this.maxCacheSize = cacheSize == 0 ? 
CamelContextHelper.getMaximumCachePoolSize(camelContext) : cacheSize;
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
index ba52514..257d5c3 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumerTemplate.java
@@ -22,6 +22,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.ServiceHelper;
@@ -246,7 +247,7 @@ public class DefaultConsumerTemplate extends ServiceSupport 
implements ConsumerT
         return answer;
     }
 
-    private ConsumerCache getConsumerCache() {
+    private org.apache.camel.spi.ConsumerCache getConsumerCache() {
         if (!isStarted()) {
             throw new IllegalStateException("ConsumerTemplate has not been 
started");
         }
@@ -255,7 +256,7 @@ public class DefaultConsumerTemplate extends ServiceSupport 
implements ConsumerT
 
     protected void doStart() throws Exception {
         if (consumerCache == null) {
-            consumerCache = new ConsumerCache(this, camelContext, 
maximumCacheSize);
+            consumerCache = new DefaultConsumerCache(this, camelContext, 
maximumCacheSize);
         }
         ServiceHelper.startService(consumerCache);
     }
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java
similarity index 94%
rename from camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
rename to 
camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java
index d713775..5638f9d 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerCache.java
@@ -31,6 +31,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.SharedCamelInternalProcessor;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.EventHelper;
 import org.apache.camel.support.ServiceHelper;
@@ -40,7 +41,7 @@ import org.apache.camel.util.StopWatch;
 /**
  * Cache containing created {@link Producer}.
  */
-public class ProducerCache extends ServiceSupport {
+public class DefaultProducerCache extends ServiceSupport implements 
ProducerCache {
 
     private final CamelContext camelContext;
     private final ServicePool<AsyncProducer> producers;
@@ -52,7 +53,7 @@ public class ProducerCache extends ServiceSupport {
     private boolean extendedStatistics;
     private int maxCacheSize;
 
-    public ProducerCache(Object source, CamelContext camelContext, int 
cacheSize) {
+    public DefaultProducerCache(Object source, CamelContext camelContext, int 
cacheSize) {
         this.source = source;
         this.camelContext = camelContext;
         this.maxCacheSize = cacheSize == 0 ? 
CamelContextHelper.getMaximumCachePoolSize(camelContext) : cacheSize;
@@ -473,23 +474,4 @@ public class ProducerCache extends ServiceSupport {
         return "ProducerCache for source: " + source + ", capacity: " + 
getCapacity();
     }
 
-    /**
-     * Callback for sending a exchange message to a endpoint using an {@link 
AsyncProcessor} capable producer.
-     * <p/>
-     * Using this callback as a template pattern ensures that Camel handles 
the resource handling and will
-     * start and stop the given producer, to avoid resource leaks.
-     *
-         */
-    public interface AsyncProducerCallback {
-
-        /**
-         * Performs operation on the given producer to send the given exchange.
-         *
-         * @param asyncProducer   the async producer, is never <tt>null</tt>
-         * @param exchange        the exchange to process
-         * @param callback        the async callback
-         * @return (doneSync) <tt>true</tt> to continue execute synchronously, 
<tt>false</tt> to continue being executed asynchronously
-         */
-        boolean doInAsyncProducer(AsyncProducer asyncProducer, Exchange 
exchange, AsyncCallback callback);
-    }
 }
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
index 9c09cf3..d4dedfe 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
@@ -34,6 +34,7 @@ import org.apache.camel.NoSuchEndpointException;
 import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.processor.ConvertBodyProcessor;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.ExchangeHelper;
@@ -679,7 +680,7 @@ public class DefaultProducerTemplate extends ServiceSupport 
implements ProducerT
                 });
     }
 
-    private ProducerCache getProducerCache() {
+    private org.apache.camel.spi.ProducerCache getProducerCache() {
         if (!isStarted()) {
             throw new IllegalStateException("ProducerTemplate has not been 
started");
         }
@@ -713,7 +714,7 @@ public class DefaultProducerTemplate extends ServiceSupport 
implements ProducerT
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext, 
maximumCacheSize);
+            producerCache = new DefaultProducerCache(this, camelContext, 
maximumCacheSize);
             producerCache.setEventNotifierEnabled(isEventNotifierEnabled());
         }
 
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
index 1a1a7a3..5806f35 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
@@ -48,11 +48,9 @@ import org.apache.camel.StartupListener;
 import org.apache.camel.TimerListener;
 import org.apache.camel.VetoCamelContextStartException;
 import org.apache.camel.cluster.CamelClusterService;
-import org.apache.camel.impl.ConsumerCache;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.DefaultEndpointRegistry;
 import org.apache.camel.impl.EventDrivenConsumerRoute;
-import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.impl.ThrottlingExceptionRoutePolicy;
 import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
 import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager;
@@ -87,6 +85,7 @@ import org.apache.camel.processor.interceptor.BacklogDebugger;
 import org.apache.camel.processor.interceptor.BacklogTracer;
 import org.apache.camel.runtimecatalog.RuntimeCamelCatalog;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.InflightRepository;
@@ -95,6 +94,7 @@ import org.apache.camel.spi.ManagementAgent;
 import org.apache.camel.spi.ManagementNameStrategy;
 import org.apache.camel.spi.ManagementObjectStrategy;
 import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RestRegistry;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RuntimeEndpointRegistry;
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumerCache.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumerCache.java
index 8ccf129..def0b3d 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumerCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumerCache.java
@@ -19,7 +19,7 @@ package org.apache.camel.management.mbean;
 import org.apache.camel.CamelContext;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.api.management.mbean.ManagedConsumerCacheMBean;
-import org.apache.camel.impl.ConsumerCache;
+import org.apache.camel.spi.ConsumerCache;
 
 @ManagedResource(description = "Managed ConsumerCache")
 public class ManagedConsumerCache extends ManagedService implements 
ManagedConsumerCacheMBean {
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
index e72dc3c..5b2eac5 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
@@ -19,7 +19,7 @@ package org.apache.camel.management.mbean;
 import org.apache.camel.CamelContext;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.api.management.mbean.ManagedProducerCacheMBean;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.spi.ProducerCache;
 
 @ManagedResource(description = "Managed ProducerCache")
 public class ManagedProducerCache extends ManagedService implements 
ManagedProducerCacheMBean {
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
index 05a29fa..7f8dda6 100644
--- 
a/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
+++ 
b/camel-core/src/main/java/org/apache/camel/model/DynamicRouterDefinition.java
@@ -28,6 +28,7 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.DynamicRouter;
 import org.apache.camel.spi.Metadata;
@@ -169,7 +170,7 @@ public class DynamicRouterDefinition<Type extends 
ProcessorDefinition<Type>> ext
     }
     
     /**
-     * Sets the maximum size used by the {@link 
org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link DefaultProducerCache} which is 
used
      * to cache and reuse producers when using this dynamic router, when uris 
are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache 
size, or <tt>-1</tt> to turn cache off.
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
index f54b9fd..69ee984 100644
--- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -25,6 +25,7 @@ import javax.xml.bind.annotation.XmlTransient;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.Enricher;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -187,7 +188,7 @@ public class EnrichDefinition extends 
NoOutputExpressionNode {
     }
 
     /**
-     * Sets the maximum size used by the {@link 
org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link DefaultProducerCache} which is 
used
      * to cache and reuse producer when uris are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache 
size, or <tt>-1</tt> to turn cache off.
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index 667832e..87647d0 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -25,6 +25,7 @@ import javax.xml.bind.annotation.XmlTransient;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumerCache;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.PollEnricher;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -200,7 +201,7 @@ public class PollEnrichDefinition extends 
NoOutputExpressionNode {
     }
 
     /**
-     * Sets the maximum size used by the {@link 
org.apache.camel.impl.ConsumerCache} which is used
+     * Sets the maximum size used by the {@link DefaultConsumerCache} which is 
used
      * to cache and reuse consumers when uris are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache 
size, or <tt>-1</tt> to turn cache off.
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 2d6e282..9094494 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -57,7 +57,6 @@ import org.apache.camel.model.dataformat.CustomDataFormat;
 import org.apache.camel.model.language.ConstantExpression;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.model.language.LanguageExpression;
-import org.apache.camel.model.language.SimpleExpression;
 import org.apache.camel.model.rest.RestDefinition;
 import org.apache.camel.processor.InterceptEndpointProcessor;
 import org.apache.camel.processor.Pipeline;
@@ -635,7 +634,7 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * Sends the exchange to the given dynamic endpoint
      *
      * @param uri  the dynamic endpoint to send to (resolved using simple 
language by default)
-     * @param cacheSize sets the maximum size used by the {@link 
org.apache.camel.impl.ConsumerCache} which is used to cache and reuse producers.
+     * @param cacheSize sets the maximum size used by the {@link 
org.apache.camel.spi.ConsumerCache} which is used to cache and reuse producers.
      *
      * @return the builder
      */
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index 9a4e945..48bc007 100644
--- 
a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ 
b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -456,7 +456,7 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
     }
 
     /**
-     * Sets the maximum size used by the {@link 
org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link 
org.apache.camel.spi.ProducerCache} which is used
      * to cache and reuse producers when using this recipient list, when uris 
are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache 
size, or <tt>-1</tt> to turn cache off.
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
index d5caaef..b8495ce 100644
--- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java
@@ -182,7 +182,7 @@ public class RoutingSlipDefinition<Type extends 
ProcessorDefinition<Type>> exten
     }
 
     /**
-     * Sets the maximum size used by the {@link 
org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link 
org.apache.camel.spi.ProducerCache} which is used
      * to cache and reuse producers when using this routing slip, when uris 
are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache 
size, or <tt>-1</tt> to turn cache off.
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
index 35065fe..11aea0b 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
@@ -151,7 +151,7 @@ public class ToDynamicDefinition extends 
NoOutputDefinition<ToDynamicDefinition>
     }
 
     /**
-     * Sets the maximum size used by the {@link 
org.apache.camel.impl.ConsumerCache} which is used to cache and reuse producers.
+     * Sets the maximum size used by the {@link 
org.apache.camel.spi.ConsumerCache} which is used to cache and reuse producers.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache 
size, or <tt>-1</tt> to turn cache off.
      * @return the builder
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
index a1aab13..d152cf4 100644
--- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
@@ -307,7 +307,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends T
     }
 
     /**
-     * Sets the maximum size used by the {@link 
org.apache.camel.impl.ProducerCache} which is used
+     * Sets the maximum size used by the {@link 
org.apache.camel.spi.ProducerCache} which is used
      * to cache and reuse producers, when uris are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache 
size, or <tt>-1</tt> to turn cache off.
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java 
b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
index 35b26ac..13688f6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
@@ -26,10 +26,11 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.DefaultExchange;
@@ -342,7 +343,7 @@ public class Enricher extends ServiceSupport implements 
AsyncProcessor, IdAware,
         }
 
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext, cacheSize);
+            producerCache = new DefaultProducerCache(this, camelContext, 
cacheSize);
             log.debug("Enricher {} using ProducerCache with cacheSize={}", 
this, producerCache.getCapacity());
         }
 
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java 
b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index 08a4a96..28842c6 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -26,8 +26,9 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.PollingConsumer;
-import org.apache.camel.impl.ConsumerCache;
+import org.apache.camel.impl.DefaultConsumerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.ConsumerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.IdAware;
@@ -369,7 +370,7 @@ public class PollEnricher extends ServiceSupport implements 
AsyncProcessor, IdAw
     protected void doStart() throws Exception {
         if (consumerCache == null) {
             // create consumer cache if we use dynamic expressions for 
computing the endpoints to poll
-            consumerCache = new ConsumerCache(this, camelContext, cacheSize);
+            consumerCache = new DefaultConsumerCache(this, camelContext, 
cacheSize);
             log.debug("PollEnrich {} using ConsumerCache with cacheSize={}", 
this, cacheSize);
         }
         if (aggregationStrategy instanceof CamelContextAware) {
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index 76630e3..10dcd43 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -26,11 +26,12 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.ObjectHelper;
@@ -181,7 +182,7 @@ public class RecipientList extends ServiceSupport 
implements AsyncProcessor, IdA
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext, cacheSize);
+            producerCache = new DefaultProducerCache(this, camelContext, 
cacheSize);
             log.debug("RecipientList {} using ProducerCache with 
cacheSize={}", this, producerCache.getCapacity());
         }
         ServiceHelper.startService(aggregationStrategy, producerCache);
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 67fc170..77d4401 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -31,8 +31,9 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.EndpointHelper;
@@ -265,7 +266,7 @@ public class RecipientListProcessor extends 
MulticastProcessor {
     protected void doStart() throws Exception {
         super.doStart();
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, getCamelContext(), 0);
+            producerCache = new DefaultProducerCache(this, getCamelContext(), 
0);
         }
         ServiceHelper.startService(producerCache);
     }
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java 
b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
index a2decf7..05e9a1e 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -29,10 +29,10 @@ import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.Message;
 import org.apache.camel.Traceable;
 import org.apache.camel.builder.ExpressionBuilder;
-import org.apache.camel.impl.ProducerCache;
-import org.apache.camel.impl.ProducerCache.AsyncProducerCallback;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.DefaultExchange;
@@ -345,98 +345,96 @@ public class RoutingSlip extends ServiceSupport 
implements AsyncProcessor, Trace
                 originalCallback.done(false);
             }
         };
-        return producerCache.doInAsyncProducer(endpoint, exchange, callback, 
new AsyncProducerCallback() {
-            public boolean doInAsyncProducer(AsyncProducer asyncProducer, 
Exchange exchange, final AsyncCallback callback) {
-
-                // rework error handling to support fine grained error handling
-                RouteContext routeContext = exchange.getUnitOfWork() != null ? 
exchange.getUnitOfWork().getRouteContext() : null;
-                AsyncProcessor target = createErrorHandler(routeContext, 
exchange, asyncProducer, endpoint);
-
-                // set property which endpoint we send to and the producer 
that can do it
-                exchange.setProperty(Exchange.TO_ENDPOINT, 
endpoint.getEndpointUri());
-                exchange.setProperty(Exchange.SLIP_ENDPOINT, 
endpoint.getEndpointUri());
-                exchange.setProperty(Exchange.SLIP_PRODUCER, asyncProducer);
-
-                return target.process(exchange, new AsyncCallback() {
-                    public void done(boolean doneSync) {
-                        // cleanup producer after usage
-                        exchange.removeProperty(Exchange.SLIP_PRODUCER);
-
-                        // we only have to handle async completion of the 
routing slip
-                        if (doneSync) {
-                            callback.done(true);
-                            return;
-                        }
-
-                        try {
-                            // continue processing the routing slip 
asynchronously
-                            Exchange current = 
prepareExchangeForRoutingSlip(exchange, endpoint);
+        return producerCache.doInAsyncProducer(endpoint, exchange, callback, 
(p, ex, cb) -> {
+
+            // rework error handling to support fine grained error handling
+            RouteContext routeContext = ex.getUnitOfWork() != null ? 
ex.getUnitOfWork().getRouteContext() : null;
+            AsyncProcessor target = createErrorHandler(routeContext, ex, p, 
endpoint);
+
+            // set property which endpoint we send to and the producer that 
can do it
+            ex.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
+            ex.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri());
+            ex.setProperty(Exchange.SLIP_PRODUCER, p);
+
+            return target.process(ex, new AsyncCallback() {
+                public void done(boolean doneSync) {
+                    // cleanup producer after usage
+                    ex.removeProperty(Exchange.SLIP_PRODUCER);
+
+                    // we only have to handle async completion of the routing 
slip
+                    if (doneSync) {
+                        cb.done(true);
+                        return;
+                    }
 
-                            while (iter.hasNext(current)) {
+                    try {
+                        // continue processing the routing slip asynchronously
+                        Exchange current = prepareExchangeForRoutingSlip(ex, 
endpoint);
 
-                                // we ignore some kind of exceptions and allow 
us to continue
-                                if (isIgnoreInvalidEndpoints()) {
-                                    FailedToCreateProducerException e = 
current.getException(FailedToCreateProducerException.class);
-                                    if (e != null) {
-                                        if (log.isDebugEnabled()) {
-                                            log.debug("Endpoint uri is 
invalid: " + endpoint + ". This exception will be ignored.", e);
-                                        }
-                                        current.setException(null);
-                                    }
-                                }
-
-                                // Decide whether to continue with the 
recipients or not; similar logic to the Pipeline
-                                // check for error if so we should break out
-                                if (!continueProcessing(current, "so breaking 
out of the routing slip", log)) {
-                                    break;
-                                }
+                        while (iter.hasNext(current)) {
 
-                                Endpoint endpoint;
-                                try {
-                                    endpoint = resolveEndpoint(iter, exchange);
-                                    // if no endpoint was resolved then try 
the next
-                                    if (endpoint == null) {
-                                        continue;
+                            // we ignore some kind of exceptions and allow us 
to continue
+                            if (isIgnoreInvalidEndpoints()) {
+                                FailedToCreateProducerException e = 
current.getException(FailedToCreateProducerException.class);
+                                if (e != null) {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Endpoint uri is invalid: " 
+ endpoint + ". This exception will be ignored.", e);
                                     }
-                                } catch (Exception e) {
-                                    // error resolving endpoint so we should 
break out
-                                    exchange.setException(e);
-                                    break;
+                                    current.setException(null);
                                 }
+                            }
 
-                                // prepare and process the routing slip
-                                boolean sync = processExchange(endpoint, 
current, original, callback, iter);
-                                current = 
prepareExchangeForRoutingSlip(current, endpoint);
+                            // Decide whether to continue with the recipients 
or not; similar logic to the Pipeline
+                            // check for error if so we should break out
+                            if (!continueProcessing(current, "so breaking out 
of the routing slip", log)) {
+                                break;
+                            }
 
-                                if (!sync) {
-                                    log.trace("Processing exchangeId: {} is 
continued being processed asynchronously", original.getExchangeId());
-                                    return;
+                            Endpoint endpoint1;
+                            try {
+                                endpoint1 = resolveEndpoint(iter, ex);
+                                // if no endpoint was resolved then try the 
next
+                                if (endpoint1 == null) {
+                                    continue;
                                 }
+                            } catch (Exception e) {
+                                // error resolving endpoint so we should break 
out
+                                ex.setException(e);
+                                break;
                             }
 
-                            // logging nextExchange as it contains the 
exchange that might have altered the payload and since
-                            // we are logging the completion if will be 
confusing if we log the original instead
-                            // we could also consider logging the original and 
the nextExchange then we have *before* and *after* snapshots
-                            log.trace("Processing complete for exchangeId: {} 
>>> {}", original.getExchangeId(), current);
+                            // prepare and process the routing slip
+                            boolean sync = processExchange(endpoint1, current, 
original, cb, iter);
+                            current = prepareExchangeForRoutingSlip(current, 
endpoint1);
 
-                            // copy results back to the original exchange
-                            ExchangeHelper.copyResults(original, current);
-                        } catch (Throwable e) {
-                            exchange.setException(e);
+                            if (!sync) {
+                                log.trace("Processing exchangeId: {} is 
continued being processed asynchronously", original.getExchangeId());
+                                return;
+                            }
                         }
 
-                        // okay we are completely done with the routing slip
-                        // so we need to signal done on the original callback 
so it can continue
-                        originalCallback.done(false);
+                        // logging nextExchange as it contains the exchange 
that might have altered the payload and since
+                        // we are logging the completion if will be confusing 
if we log the original instead
+                        // we could also consider logging the original and the 
nextExchange then we have *before* and *after* snapshots
+                        log.trace("Processing complete for exchangeId: {} >>> 
{}", original.getExchangeId(), current);
+
+                        // copy results back to the original exchange
+                        ExchangeHelper.copyResults(original, current);
+                    } catch (Throwable e) {
+                        ex.setException(e);
                     }
-                });
-            }
+
+                    // okay we are completely done with the routing slip
+                    // so we need to signal done on the original callback so 
it can continue
+                    originalCallback.done(false);
+                }
+            });
         });
     }
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext, cacheSize);
+            producerCache = new DefaultProducerCache(this, camelContext, 
cacheSize);
             log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", 
this, producerCache.getCapacity());
         }
 
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
index 3879226..1499e88 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
@@ -27,9 +27,10 @@ import org.apache.camel.Expression;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
 import org.apache.camel.ResolveEndpointFailedException;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.SendDynamicAware;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.EndpointHelper;
@@ -246,7 +247,7 @@ public class SendDynamicProcessor extends ServiceSupport 
implements AsyncProcess
 
     protected void doStart() throws Exception {
         if (producerCache == null) {
-            producerCache = new ProducerCache(this, camelContext, cacheSize);
+            producerCache = new DefaultProducerCache(this, camelContext, 
cacheSize);
             log.debug("DynamicSendTo {} using ProducerCache with 
cacheSize={}", this, producerCache.getCapacity());
         }
 
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
index c694231..b641ee5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -29,8 +29,9 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.Traceable;
 import org.apache.camel.impl.InterceptSendToEndpoint;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.impl.DefaultProducerCache;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.support.AsyncProcessorHelper;
 import org.apache.camel.support.EndpointHelper;
 import org.apache.camel.support.EventHelper;
@@ -203,7 +204,7 @@ public class SendProcessor extends ServiceSupport 
implements AsyncProcessor, Tra
             // and use a regular HashMap as we do not want a soft reference 
store that may get re-claimed when low on memory
             // as we want to ensure the producer is kept around, to ensure its 
lifecycle is fully managed,
             // eg stopping the producer when we stop etc.
-            producerCache = new ProducerCache(this, camelContext, 1);
+            producerCache = new DefaultProducerCache(this, camelContext, 1);
             // do not add as service as we do not want to manage the producer 
cache
         }
         ServiceHelper.startService(producerCache);
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
 
b/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
index a4f8a3f..8fccba7 100644
--- 
a/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/ConsumerCacheZeroCapacityTest.java
@@ -30,7 +30,7 @@ public class ConsumerCacheZeroCapacityTest extends 
ContextTestSupport {
 
     @Test
     public void testConsumerCacheZeroCapacity() throws Exception {
-        ConsumerCache cache = new ConsumerCache(this, context, -1);
+        DefaultConsumerCache cache = new DefaultConsumerCache(this, context, 
-1);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java 
b/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java
index b1a792d..7e199fb 100644
--- 
a/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/DefaultConsumerCacheTest.java
@@ -25,7 +25,7 @@ public class DefaultConsumerCacheTest extends 
ContextTestSupport {
 
     @Test
     public void testCacheConsumers() throws Exception {
-        ConsumerCache cache = new ConsumerCache(this, context, 0);
+        DefaultConsumerCache cache = new DefaultConsumerCache(this, context, 
0);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java 
b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
index c7c42d2..c22d7ac 100644
--- 
a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerCacheTest.java
@@ -42,7 +42,7 @@ public class DefaultProducerCacheTest extends 
ContextTestSupport {
 
     @Test
     public void testCacheProducerAcquireAndRelease() throws Exception {
-        ProducerCache cache = new ProducerCache(this, context, 0);
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, 
0);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
@@ -64,7 +64,7 @@ public class DefaultProducerCacheTest extends 
ContextTestSupport {
 
     @Test
     public void testCacheStopExpired() throws Exception {
-        ProducerCache cache = new ProducerCache(this, context, 5);
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, 
5);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
@@ -96,7 +96,7 @@ public class DefaultProducerCacheTest extends 
ContextTestSupport {
 
     @Test
     public void testExtendedStatistics() throws Exception {
-        ProducerCache cache = new ProducerCache(this, context, 5);
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, 
5);
         cache.setExtendedStatistics(true);
         cache.start();
 
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java 
b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
index de233cd..f1fc61e 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java
@@ -25,7 +25,7 @@ public class EmptyProducerCacheTest extends 
ContextTestSupport {
 
     @Test
     public void testEmptyCache() throws Exception {
-        ProducerCache cache = new ProducerCache(this, context, -1);
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, 
-1);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
@@ -45,7 +45,7 @@ public class EmptyProducerCacheTest extends 
ContextTestSupport {
 
     @Test
     public void testCacheProducerAcquireAndRelease() throws Exception {
-        ProducerCache cache = new ProducerCache(this, context, -1);
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, 
-1);
         cache.start();
 
         assertEquals("Size should be 0", 0, cache.size());
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/ProducerCacheNonSingletonTest.java
 
b/camel-core/src/test/java/org/apache/camel/impl/ProducerCacheNonSingletonTest.java
index 41d9e7b..0aba8a6 100644
--- 
a/camel-core/src/test/java/org/apache/camel/impl/ProducerCacheNonSingletonTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/ProducerCacheNonSingletonTest.java
@@ -41,7 +41,7 @@ public class ProducerCacheNonSingletonTest extends 
ContextTestSupport {
     public void testNonSingleton() throws Exception {
         context.addComponent("dummy", new MyDummyComponent());
 
-        ProducerCache cache = new ProducerCache(this, context, -1);
+        DefaultProducerCache cache = new DefaultProducerCache(this, context, 
-1);
         cache.start();
 
         Endpoint endpoint = context.getEndpoint("dummy:foo");
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheHitsTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheHitsTest.java
index cb34c1c..50dd9f8 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheHitsTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheHitsTest.java
@@ -25,7 +25,7 @@ import javax.management.ObjectName;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ConsumerCache;
+import org.apache.camel.impl.DefaultConsumerCache;
 import org.junit.Test;
 
 public class ManagedConsumerCacheHitsTest extends ManagementTestSupport {
@@ -40,7 +40,7 @@ public class ManagedConsumerCacheHitsTest extends 
ManagementTestSupport {
         // always register services in JMX so we can enlist our consumer 
template/cache
         
context.getManagementStrategy().getManagementAgent().setRegisterAlways(true);
 
-        ConsumerCache cache = new ConsumerCache(this, context, 0);
+        DefaultConsumerCache cache = new DefaultConsumerCache(this, context, 
0);
         context.addService(cache);
 
         template.sendBody("seda:a", "Hello World");
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheTest.java
 
b/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheTest.java
index 2bc8ed5..d3349af 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/ManagedConsumerCacheTest.java
@@ -27,7 +27,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.ConsumerCache;
+import org.apache.camel.impl.DefaultConsumerCache;
 import org.junit.Test;
 
 public class ManagedConsumerCacheTest extends ManagementTestSupport {
@@ -42,7 +42,7 @@ public class ManagedConsumerCacheTest extends 
ManagementTestSupport {
         // always register services in JMX so we can enlist our consumer 
template/cache
         
context.getManagementStrategy().getManagementAgent().setRegisterAlways(true);
 
-        ConsumerCache cache = new ConsumerCache(this, context, 0);
+        DefaultConsumerCache cache = new DefaultConsumerCache(this, context, 
0);
         context.addService(cache);
 
         template.sendBody("direct:start", "Hello World");
diff --git 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
index 74d57fa..c485ea9 100644
--- 
a/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
+++ 
b/components/camel-grpc/src/main/java/org/apache/camel/component/grpc/client/GrpcResponseRouterStreamObserver.java
@@ -22,7 +22,8 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.grpc.GrpcConfiguration;
 import org.apache.camel.component.grpc.GrpcConstants;
-import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.impl.DefaultProducerCache;
+import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.support.CamelContextHelper;
 
 /**
@@ -40,7 +41,7 @@ public class GrpcResponseRouterStreamObserver implements 
StreamObserver<Object>
         this.sourceEndpoint = sourceEndpoint;
         this.endpoint = 
CamelContextHelper.getMandatoryEndpoint(sourceEndpoint.getCamelContext(), 
configuration.getStreamRepliesTo());
         sourceEndpoint.getCamelContext().createProducerTemplate(-1);
-        this.producerCache = new ProducerCache(this, 
sourceEndpoint.getCamelContext(), -1);
+        this.producerCache = new DefaultProducerCache(this, 
sourceEndpoint.getCamelContext(), -1);
     }
 
     @Override

Reply via email to