This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch sandbox/camel-3.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 387216d38b6b4ae3d300db7e4c23be40200caea7 Author: Guillaume Nodet <[email protected]> AuthorDate: Fri Oct 5 15:24:27 2018 +0200 Isolate o.a.c.management from o.a.c.impl --- .../main/java/org/apache/camel/CamelContext.java | 5 ++ .../src/main/java/org/apache/camel/Consumer.java | 3 + .../src/main/java/org/apache/camel/Route.java | 7 +++ .../camel}/cluster/ClusterServiceHelper.java | 3 +- .../camel}/cluster/ClusterServiceSelectors.java | 3 +- .../org/apache/camel/support/ServiceHelper.java | 4 +- .../org/apache/camel/builder/RouteBuilder.java | 21 ------- .../camel/component/seda/SedaPollingConsumer.java | 6 ++ .../impl/cluster/ClusteredRouteController.java | 2 + .../camel/impl/cluster/ClusteredRoutePolicy.java | 4 +- .../impl/cluster/ClusteredRoutePolicyFactory.java | 1 + .../DefaultManagementLifecycleStrategy.java | 64 +++++++++++----------- .../InstrumentationInterceptStrategy.java | 13 ++--- .../management/mbean/ManagedClusterService.java | 2 +- .../ManagedThrottlingExceptionRoutePolicy.java | 4 +- .../ManagedThrottlingInflightRoutePolicy.java | 2 +- .../camel/support/EventDrivenPollingConsumer.java | 5 ++ .../camel/support/PollingConsumerSupport.java | 6 ++ .../camel/support/ProcessorPollingConsumer.java | 5 ++ .../apache/camel/support/RoutePolicySupport.java | 41 ++++---------- .../ThrottlingExceptionHalfOpenHandler.java | 2 +- .../ThrottlingExceptionRoutePolicy.java | 4 +- .../ThrottlingInflightRoutePolicy.java | 5 +- .../camel/component/test/TestEndpointTest.java | 5 ++ .../impl/cluster/ClusterServiceSelectorTest.java | 5 +- .../ManagedThrottlingExceptionRoutePolicyTest.java | 4 +- .../ManagedThrottlingInflightRoutePolicyTest.java | 2 +- ...xceptionRoutePolicyHalfOpenHandlerSedaTest.java | 4 +- ...ingExceptionRoutePolicyHalfOpenHandlerTest.java | 4 +- ...ThrottlingExceptionRoutePolicyHalfOpenTest.java | 2 +- ...lingExceptionRoutePolicyKeepOpenOnInitTest.java | 2 +- ...tlingExceptionRoutePolicyOpenViaConfigTest.java | 2 +- .../ThrottlingExceptionRoutePolicyTest.java | 4 +- .../ThrottlingInflightRoutePolicyTest.java | 2 +- .../component/disruptor/DisruptorConsumer.java | 5 ++ .../jms/JmsThrottlingInflightRoutePolicyTest.java | 2 +- .../tx/JMSTransactionThrottlingRoutePolicyTest.xml | 2 +- .../camel/component/master/MasterComponent.java | 2 +- .../quartz2/MultiplePoliciesOnRouteTest.java | 2 +- .../camel/routepolicy/quartz2/MultiplePolicies.xml | 2 +- .../BackpressurePublisherRoutePolicyTest.java | 2 +- .../ThrottlingInflightRoutePolicyTest.xml | 2 +- .../component/connector/ConnectorProducer.java | 2 +- .../resources/META-INF/spring/camel-server.xml | 2 +- 44 files changed, 138 insertions(+), 133 deletions(-) diff --git a/camel-api/src/main/java/org/apache/camel/CamelContext.java b/camel-api/src/main/java/org/apache/camel/CamelContext.java index 741475c..704624b 100644 --- a/camel-api/src/main/java/org/apache/camel/CamelContext.java +++ b/camel-api/src/main/java/org/apache/camel/CamelContext.java @@ -179,6 +179,11 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { String getManagementName(); /** + * Sets the name this {@link CamelContext} will be registered in JMX. + */ + void setManagementName(String name); + + /** * Gets the version of the this CamelContext. * * @return the version diff --git a/camel-api/src/main/java/org/apache/camel/Consumer.java b/camel-api/src/main/java/org/apache/camel/Consumer.java index 89def42..825fe6a 100644 --- a/camel-api/src/main/java/org/apache/camel/Consumer.java +++ b/camel-api/src/main/java/org/apache/camel/Consumer.java @@ -20,4 +20,7 @@ package org.apache.camel; * A consumer of message exchanges from an {@link Endpoint} */ public interface Consumer extends Service, EndpointAware { + + Processor getProcessor(); + } diff --git a/camel-api/src/main/java/org/apache/camel/Route.java b/camel-api/src/main/java/org/apache/camel/Route.java index 53fddce..23c46e0 100644 --- a/camel-api/src/main/java/org/apache/camel/Route.java +++ b/camel-api/src/main/java/org/apache/camel/Route.java @@ -74,6 +74,13 @@ public interface Route extends EndpointAware { Consumer getConsumer(); /** + * Gets the {@link Processor} + * + * @return the processor + */ + Processor getProcessor(); + + /** * Whether or not the route supports suspension (suspend and resume) * * @return <tt>true</tt> if this route supports suspension diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceHelper.java b/camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceHelper.java similarity index 95% rename from camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceHelper.java rename to camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceHelper.java index 22b7706..1571e0f 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceHelper.java +++ b/camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceHelper.java @@ -14,13 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.impl.cluster; +package org.apache.camel.cluster; import java.util.Optional; import java.util.Set; import org.apache.camel.CamelContext; -import org.apache.camel.cluster.CamelClusterService; import org.apache.camel.util.ObjectHelper; public final class ClusterServiceHelper { diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceSelectors.java b/camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceSelectors.java similarity index 98% rename from camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceSelectors.java rename to camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceSelectors.java index aa86a5e..485f648 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusterServiceSelectors.java +++ b/camel-api/src/main/java/org/apache/camel/cluster/ClusterServiceSelectors.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.impl.cluster; +package org.apache.camel.cluster; import java.util.Collection; import java.util.Comparator; @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import org.apache.camel.cluster.CamelClusterService; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/camel-api/src/main/java/org/apache/camel/support/ServiceHelper.java b/camel-api/src/main/java/org/apache/camel/support/ServiceHelper.java index 9ef50b6..111eb37 100644 --- a/camel-api/src/main/java/org/apache/camel/support/ServiceHelper.java +++ b/camel-api/src/main/java/org/apache/camel/support/ServiceHelper.java @@ -268,10 +268,10 @@ public final class ServiceHelper { * * @param service the service * @return <tt>true</tt> if either <tt>resume</tt> method or - * {@link #startService(Service)} was called, <tt>false</tt> + * {@link #startService(Object)} was called, <tt>false</tt> * otherwise. * @throws Exception is thrown if error occurred - * @see #startService(Service) + * @see #startService(Object) */ public static boolean resumeService(Object service) throws Exception { if (service instanceof Suspendable && service instanceof SuspendableService) { diff --git a/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java b/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java index 940c4d7..d5349f2 100644 --- a/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java +++ b/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java @@ -27,7 +27,6 @@ import org.apache.camel.Component; import org.apache.camel.Endpoint; import org.apache.camel.RoutesBuilder; import org.apache.camel.component.properties.PropertiesComponent; -import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.model.FromDefinition; import org.apache.camel.model.InterceptDefinition; import org.apache.camel.model.InterceptFromDefinition; @@ -366,17 +365,6 @@ public abstract class RouteBuilder extends BuilderSupport implements RoutesBuild return getRouteCollection().onCompletion(); } - // Properties - // ----------------------------------------------------------------------- - public ModelCamelContext getContext() { - ModelCamelContext context = super.getContext(); - if (context == null) { - context = createContainer(); - setContext(context); - } - return context; - } - public void addRoutesToCamelContext(CamelContext context) throws Exception { // must configure routes before rests configureRoutes((ModelCamelContext) context); @@ -568,15 +556,6 @@ public abstract class RouteBuilder extends BuilderSupport implements RoutesBuild return this.routeCollection; } - /** - * Factory method - * - * @return the CamelContext - */ - protected ModelCamelContext createContainer() { - return new DefaultCamelContext(); - } - protected void configureRest(RestDefinition rest) { // noop } diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java index ae8ae38..6abbd5d 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaPollingConsumer.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.IsSingleton; +import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.support.PollingConsumerSupport; @@ -36,6 +37,11 @@ public class SedaPollingConsumer extends PollingConsumerSupport implements IsSin } @Override + public Processor getProcessor() { + return null; + } + + @Override public Exchange receive() { try { return getEndpoint().getQueue().take(); diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRouteController.java b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRouteController.java index aae4459..332a4fe 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRouteController.java +++ b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRouteController.java @@ -35,11 +35,13 @@ import org.apache.camel.NamedNode; import org.apache.camel.Route; import org.apache.camel.RuntimeCamelException; import org.apache.camel.cluster.CamelClusterService; +import org.apache.camel.cluster.ClusterServiceSelectors; import org.apache.camel.impl.DefaultRouteController; import org.apache.camel.meta.Experimental; import org.apache.camel.model.RouteDefinition; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.RoutePolicyFactory; +import org.apache.camel.cluster.ClusterServiceHelper; import org.apache.camel.support.ServiceHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java index 98420ab..29a7d67 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicy.java @@ -37,14 +37,14 @@ import org.apache.camel.cluster.CamelClusterEventListener; import org.apache.camel.cluster.CamelClusterMember; import org.apache.camel.cluster.CamelClusterService; import org.apache.camel.cluster.CamelClusterView; +import org.apache.camel.cluster.ClusterServiceSelectors; import org.apache.camel.management.event.CamelContextStartedEvent; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.cluster.ClusterServiceHelper; import org.apache.camel.support.EventNotifierSupport; import org.apache.camel.support.RoutePolicySupport; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ReferenceCount; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @ManagedResource(description = "Clustered Route policy using") public final class ClusteredRoutePolicy extends RoutePolicySupport implements CamelContextAware { diff --git a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicyFactory.java b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicyFactory.java index 9ff0e2f..ff058a2 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicyFactory.java +++ b/camel-core/src/main/java/org/apache/camel/impl/cluster/ClusteredRoutePolicyFactory.java @@ -20,6 +20,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.NamedNode; import org.apache.camel.RuntimeCamelException; import org.apache.camel.cluster.CamelClusterService; +import org.apache.camel.cluster.ClusterServiceSelectors; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.RoutePolicyFactory; import org.apache.camel.util.ObjectHelper; diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java index 5806f35..1971782 100644 --- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java @@ -38,6 +38,7 @@ import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.ManagementStatisticsLevel; +import org.apache.camel.NamedNode; import org.apache.camel.NonManagedService; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -48,11 +49,8 @@ import org.apache.camel.StartupListener; import org.apache.camel.TimerListener; import org.apache.camel.VetoCamelContextStartException; import org.apache.camel.cluster.CamelClusterService; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.DefaultEndpointRegistry; -import org.apache.camel.impl.EventDrivenConsumerRoute; -import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; -import org.apache.camel.impl.ThrottlingInflightRoutePolicy; +import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy; +import org.apache.camel.throttling.ThrottlingInflightRoutePolicy; import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager; import org.apache.camel.management.mbean.ManagedBacklogDebugger; import org.apache.camel.management.mbean.ManagedBacklogTracer; @@ -87,6 +85,7 @@ import org.apache.camel.runtimecatalog.RuntimeCamelCatalog; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.ConsumerCache; import org.apache.camel.spi.DataFormat; +import org.apache.camel.spi.EndpointRegistry; import org.apache.camel.spi.EventNotifier; import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.LifecycleStrategy; @@ -118,7 +117,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement // the wrapped processors is for performance counters, which are in use for the created routes // when a route is removed, we should remove the associated processors from this map - private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors = new HashMap<>(); + private final Map<Processor, KeyValueHolder<NamedNode, InstrumentationProcessor>> wrappedProcessors = new HashMap<>(); private final List<PreRegisterService> preServices = new ArrayList<>(); private final TimerListenerManager loadTimer = new ManagedLoadTimer(); private final TimerListenerManagerStartupListener loadTimerStartupListener = new TimerListenerManagerStartupListener(); @@ -149,7 +148,11 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); String name = context.getName(); - String managementName = context.getManagementNameStrategy().getName(); + String managementName = context.getManagementName(); + + if (managementName == null) { + managementName = context.getManagementNameStrategy().getName(); + } try { boolean done = false; @@ -189,9 +192,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement } // set the name we are going to use - if (context instanceof DefaultCamelContext) { - ((DefaultCamelContext) context).setManagementName(managementName); - } + context.setManagementName(managementName); try { manageObject(mc); @@ -503,8 +504,8 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement answer = new ManagedConsumerCache(context, (ConsumerCache) service); } else if (service instanceof ProducerCache) { answer = new ManagedProducerCache(context, (ProducerCache) service); - } else if (service instanceof DefaultEndpointRegistry) { - answer = new ManagedEndpointRegistry(context, (DefaultEndpointRegistry) service); + } else if (service instanceof EndpointRegistry) { + answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service); } else if (service instanceof TypeConverterRegistry) { answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service); } else if (service instanceof RestRegistry) { @@ -545,7 +546,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement // a bit of magic here as the processors we want to manage have already been registered // in the wrapped processors map when Camel have instrumented the route on route initialization // so the idea is now to only manage the processors from the map - KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor); + KeyValueHolder<NamedNode, InstrumentationProcessor> holder = wrappedProcessors.get(processor); if (holder == null) { // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc. return null; @@ -595,22 +596,19 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement // get the wrapped instrumentation processor from this route // and set me as the counter - if (route instanceof EventDrivenConsumerRoute) { - EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route; - Processor processor = edcr.getProcessor(); - if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) { - CamelInternalProcessor internal = (CamelInternalProcessor) processor; - ManagedRoute routeMBean = (ManagedRoute) mr; - - CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class); - if (task != null) { - // we need to wrap the counter with the camel context so we get stats updated on the context as well - if (camelContextMBean != null) { - CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean); - task.setCounter(wrapper); - } else { - task.setCounter(routeMBean); - } + Processor processor = route.getProcessor(); + if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) { + CamelInternalProcessor internal = (CamelInternalProcessor) processor; + ManagedRoute routeMBean = (ManagedRoute) mr; + + CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class); + if (task != null) { + // we need to wrap the counter with the camel context so we get stats updated on the context as well + if (camelContextMBean != null) { + CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean); + task.setCounter(wrapper); + } else { + task.setCounter(routeMBean); } } } @@ -746,7 +744,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement // Create a map (ProcessorType -> PerformanceCounter) // to be passed to InstrumentationInterceptStrategy. - Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters = new HashMap<>(); + Map<NamedNode, PerformanceCounter> registeredCounters = new HashMap<>(); // Each processor in a route will have its own performance counter. // These performance counter will be embedded to InstrumentationProcessor @@ -775,9 +773,9 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement for (Route route : routes) { String id = route.getId(); - Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator(); + Iterator<KeyValueHolder<NamedNode, InstrumentationProcessor>> it = wrappedProcessors.values().iterator(); while (it.hasNext()) { - KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next(); + KeyValueHolder<NamedNode, InstrumentationProcessor> holder = it.next(); RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey()); if (def != null && id.equals(def.getId())) { it.remove(); @@ -788,7 +786,7 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement } private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor, - Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) { + Map<NamedNode, PerformanceCounter> registeredCounters) { // traverse children if any exists List<ProcessorDefinition<?>> children = processor.getOutputs(); diff --git a/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java b/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java index 0a1fe79..186fc2c 100644 --- a/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java +++ b/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java @@ -22,7 +22,6 @@ import org.apache.camel.CamelContext; import org.apache.camel.NamedNode; import org.apache.camel.Processor; import org.apache.camel.management.mbean.ManagedPerformanceCounter; -import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.util.KeyValueHolder; @@ -36,21 +35,21 @@ import org.apache.camel.util.KeyValueHolder; */ public class InstrumentationInterceptStrategy implements InterceptStrategy { - private Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters; - private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors; + private Map<NamedNode, PerformanceCounter> registeredCounters; + private final Map<Processor, KeyValueHolder<NamedNode, InstrumentationProcessor>> wrappedProcessors; - public InstrumentationInterceptStrategy(Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters, - Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors) { + public InstrumentationInterceptStrategy(Map<NamedNode, PerformanceCounter> registeredCounters, + Map<Processor, KeyValueHolder<NamedNode, InstrumentationProcessor>> wrappedProcessors) { this.registeredCounters = registeredCounters; this.wrappedProcessors = wrappedProcessors; } - public PerformanceCounter prepareProcessor(ProcessorDefinition<?> definition, Processor target, InstrumentationProcessor advice) { + public PerformanceCounter prepareProcessor(NamedNode definition, Processor target, InstrumentationProcessor advice) { PerformanceCounter counter = registeredCounters.get(definition); if (counter != null) { // add it to the mapping of wrappers so we can later change it to a // decorated counter when we register the processor - KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = new KeyValueHolder<>(definition, advice); + KeyValueHolder<NamedNode, InstrumentationProcessor> holder = new KeyValueHolder<>(definition, advice); wrappedProcessors.put(target, holder); } return counter; diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java index cb4c427..52b8cb5 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedClusterService.java @@ -25,7 +25,7 @@ import org.apache.camel.ServiceStatus; import org.apache.camel.StatefulService; import org.apache.camel.api.management.mbean.ManagedClusterServiceMBean; import org.apache.camel.cluster.CamelClusterService; -import org.apache.camel.impl.cluster.ClusterServiceHelper; +import org.apache.camel.cluster.ClusterServiceHelper; import org.apache.camel.spi.ManagementStrategy; public class ManagedClusterService implements ManagedClusterServiceMBean { diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java index 8030eae..2f49504d 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingExceptionRoutePolicy.java @@ -19,8 +19,8 @@ package org.apache.camel.management.mbean; import org.apache.camel.CamelContext; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.api.management.mbean.ManagedThrottlingExceptionRoutePolicyMBean; -import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler; -import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; +import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler; +import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy; @ManagedResource(description = "Managed ThrottlingExceptionRoutePolicy") public class ManagedThrottlingExceptionRoutePolicy extends ManagedService implements ManagedThrottlingExceptionRoutePolicyMBean { diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingInflightRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingInflightRoutePolicy.java index f8c1c74..7c287e4 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingInflightRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedThrottlingInflightRoutePolicy.java @@ -20,7 +20,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.LoggingLevel; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.api.management.mbean.ManagedThrottlingInflightRoutePolicyMBean; -import org.apache.camel.impl.ThrottlingInflightRoutePolicy; +import org.apache.camel.throttling.ThrottlingInflightRoutePolicy; @ManagedResource(description = "Managed ThrottlingInflightRoutePolicy") public class ManagedThrottlingInflightRoutePolicy extends ManagedService implements ManagedThrottlingInflightRoutePolicyMBean { diff --git a/camel-core/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java index 4b424bd..17db223 100644 --- a/camel-core/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/support/EventDrivenPollingConsumer.java @@ -67,6 +67,11 @@ public class EventDrivenPollingConsumer extends PollingConsumerSupport implement this.interruptedExceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), EventDrivenPollingConsumer.class); } + @Override + public Processor getProcessor() { + return this; + } + public boolean isBlockWhenFull() { return blockWhenFull; } diff --git a/camel-core/src/main/java/org/apache/camel/support/PollingConsumerSupport.java b/camel-core/src/main/java/org/apache/camel/support/PollingConsumerSupport.java index d69a92f..09224fc 100644 --- a/camel-core/src/main/java/org/apache/camel/support/PollingConsumerSupport.java +++ b/camel-core/src/main/java/org/apache/camel/support/PollingConsumerSupport.java @@ -18,6 +18,7 @@ package org.apache.camel.support; import org.apache.camel.Endpoint; import org.apache.camel.PollingConsumer; +import org.apache.camel.Processor; import org.apache.camel.spi.ExceptionHandler; /** @@ -42,6 +43,11 @@ public abstract class PollingConsumerSupport extends ServiceSupport implements P return endpoint; } + @Override + public Processor getProcessor() { + return null; + } + public ExceptionHandler getExceptionHandler() { return exceptionHandler; } diff --git a/camel-core/src/main/java/org/apache/camel/support/ProcessorPollingConsumer.java b/camel-core/src/main/java/org/apache/camel/support/ProcessorPollingConsumer.java index 9893746..17a63f2 100644 --- a/camel-core/src/main/java/org/apache/camel/support/ProcessorPollingConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/support/ProcessorPollingConsumer.java @@ -37,6 +37,11 @@ public class ProcessorPollingConsumer extends PollingConsumerSupport implements this.processor = processor; } + @Override + public Processor getProcessor() { + return processor; + } + protected void doStart() throws Exception { ServiceHelper.startService(processor); } diff --git a/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java b/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java index a74eea8..90d20e4 100644 --- a/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java +++ b/camel-core/src/main/java/org/apache/camel/support/RoutePolicySupport.java @@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Route; -import org.apache.camel.Suspendable; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.RoutePolicy; @@ -107,21 +106,13 @@ public abstract class RoutePolicySupport extends ServiceSupport implements Route * @return <tt>true</tt> if the consumer was suspended or stopped, <tt>false</tt> if the consumer was already suspend or stopped */ public boolean suspendOrStopConsumer(Consumer consumer) throws Exception { - if (consumer instanceof Suspendable) { - boolean suspended = ServiceHelper.suspendService(consumer); - if (suspended) { - log.debug("Suspended consumer {}", consumer); - } else { - log.trace("Consumer already suspended {}", consumer); - } - return suspended; - } - if (!ServiceHelper.isStopped(consumer)) { - ServiceHelper.stopService(consumer); - log.debug("Stopped consumer {}", consumer); - return true; + boolean suspended = ServiceHelper.suspendService(consumer); + if (suspended) { + log.debug("Suspended consumer {}", consumer); + } else { + log.trace("Consumer already suspended {}", consumer); } - return false; + return suspended; } /** @@ -134,21 +125,13 @@ public abstract class RoutePolicySupport extends ServiceSupport implements Route * @return <tt>true</tt> if the consumer was resumed or started, <tt>false</tt> if the consumer was already resumed or started */ public boolean resumeOrStartConsumer(Consumer consumer) throws Exception { - if (consumer instanceof Suspendable) { - boolean resumed = ServiceHelper.resumeService(consumer); - if (resumed) { - log.debug("Resumed consumer {}", consumer); - } else { - log.trace("Consumer already resumed {}", consumer); - } - return resumed; - } - if (!ServiceHelper.isStarted(consumer)) { - ServiceHelper.startService(consumer); - log.debug("Started consumer {}", consumer); - return true; + boolean resumed = ServiceHelper.resumeService(consumer); + if (resumed) { + log.debug("Resumed consumer {}", consumer); + } else { + log.trace("Consumer already resumed {}", consumer); } - return false; + return resumed; } public void startRoute(Route route) throws Exception { diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionHalfOpenHandler.java similarity index 96% rename from camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java rename to camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionHalfOpenHandler.java index 84607e76..7bdb502 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionHalfOpenHandler.java +++ b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionHalfOpenHandler.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.impl; +package org.apache.camel.throttling; /** * Used by the {@link ThrottlingExceptionRoutePolicy} to allow custom code diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionRoutePolicy.java similarity index 99% rename from camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java rename to camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionRoutePolicy.java index 042559a..86f3dcb 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingExceptionRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingExceptionRoutePolicy.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.impl; +package org.apache.camel.throttling; import java.util.List; import java.util.Timer; @@ -30,8 +30,6 @@ import org.apache.camel.Exchange; import org.apache.camel.Route; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.support.RoutePolicySupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Modeled after the circuit breaker {@link ThrottlingInflightRoutePolicy} diff --git a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingInflightRoutePolicy.java similarity index 98% rename from camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java rename to camel-core/src/main/java/org/apache/camel/throttling/ThrottlingInflightRoutePolicy.java index 4947c47..d662c50 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ThrottlingInflightRoutePolicy.java +++ b/camel-core/src/main/java/org/apache/camel/throttling/ThrottlingInflightRoutePolicy.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.impl; +package org.apache.camel.throttling; import java.util.EventObject; import java.util.LinkedHashSet; @@ -34,7 +34,6 @@ import org.apache.camel.support.EventNotifierSupport; import org.apache.camel.support.RoutePolicySupport; import org.apache.camel.support.ServiceHelper; import org.apache.camel.util.ObjectHelper; -import org.slf4j.LoggerFactory; /** * A throttle based {@link org.apache.camel.spi.RoutePolicy} which is capable of dynamic @@ -224,7 +223,7 @@ public class ThrottlingInflightRoutePolicy extends RoutePolicySupport implements } protected CamelLogger createLogger() { - return new CamelLogger(LoggerFactory.getLogger(ThrottlingInflightRoutePolicy.class), getLoggingLevel()); + return new CamelLogger(log, getLoggingLevel()); } private int getSize(Route route, Exchange exchange) { diff --git a/camel-core/src/test/java/org/apache/camel/component/test/TestEndpointTest.java b/camel-core/src/test/java/org/apache/camel/component/test/TestEndpointTest.java index 5d61765..03c529e 100644 --- a/camel-core/src/test/java/org/apache/camel/component/test/TestEndpointTest.java +++ b/camel-core/src/test/java/org/apache/camel/component/test/TestEndpointTest.java @@ -79,6 +79,11 @@ public class TestEndpointTest extends ContextTestSupport { } @Override + public Processor getProcessor() { + return null; + } + + @Override public void start() throws Exception { // when starting then send a message to the processor Exchange exchange = new DefaultExchange(getEndpoint()); diff --git a/camel-core/src/test/java/org/apache/camel/impl/cluster/ClusterServiceSelectorTest.java b/camel-core/src/test/java/org/apache/camel/impl/cluster/ClusterServiceSelectorTest.java index 318e1fc..d0897da 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/cluster/ClusterServiceSelectorTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/cluster/ClusterServiceSelectorTest.java @@ -24,13 +24,14 @@ import org.apache.camel.CamelContext; import org.apache.camel.cluster.CamelClusterMember; import org.apache.camel.cluster.CamelClusterService; import org.apache.camel.cluster.CamelClusterView; +import org.apache.camel.cluster.ClusterServiceSelectors; import org.apache.camel.component.file.cluster.FileLockClusterService; import org.apache.camel.impl.DefaultCamelContext; import org.junit.Assert; import org.junit.Test; -import static org.apache.camel.impl.cluster.ClusterServiceHelper.lookupService; -import static org.apache.camel.impl.cluster.ClusterServiceHelper.mandatoryLookupService; +import static org.apache.camel.cluster.ClusterServiceHelper.lookupService; +import static org.apache.camel.cluster.ClusterServiceHelper.mandatoryLookupService; public class ClusterServiceSelectorTest { diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java index 222f3b6..aef3ea8 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java @@ -27,8 +27,8 @@ import org.apache.camel.Processor; import org.apache.camel.ServiceStatus; import org.apache.camel.api.management.mbean.ManagedThrottlingExceptionRoutePolicyMBean; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler; -import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; +import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler; +import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy; import org.junit.Test; public class ManagedThrottlingExceptionRoutePolicyTest extends ManagementTestSupport { diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java index 43141da..2cb25ae 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlingInflightRoutePolicyTest.java @@ -23,7 +23,7 @@ import javax.management.ObjectName; import org.apache.camel.ServiceStatus; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.impl.ThrottlingInflightRoutePolicy; +import org.apache.camel.throttling.ThrottlingInflightRoutePolicy; import org.junit.Test; public class ManagedThrottlingInflightRoutePolicyTest extends ManagementTestSupport { diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java index 44b89c5..fd66596 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerSedaTest.java @@ -25,8 +25,8 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler; -import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; +import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler; +import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy; import org.apache.camel.support.ServiceSupport; import org.junit.Before; import org.junit.Test; diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java index 24e2f5f..72da3aa 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenHandlerTest.java @@ -25,8 +25,8 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler; -import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; +import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler; +import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy; import org.apache.camel.support.ServiceSupport; import org.junit.Before; import org.junit.Test; diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java index 49ff49a..9791f21 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyHalfOpenTest.java @@ -25,7 +25,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; +import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy; import org.apache.camel.support.ServiceSupport; import org.junit.Before; import org.junit.Test; diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java index b650dbd..73f30a8 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyKeepOpenOnInitTest.java @@ -19,7 +19,7 @@ package org.apache.camel.processor; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; +import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy; import org.junit.Before; import org.junit.Test; diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java index c096826..27fb6be 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyOpenViaConfigTest.java @@ -19,7 +19,7 @@ package org.apache.camel.processor; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; +import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy; import org.junit.Before; import org.junit.Test; diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java index c944576..7a410bc 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingExceptionRoutePolicyTest.java @@ -25,8 +25,8 @@ import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.ThrottlingExceptionHalfOpenHandler; -import org.apache.camel.impl.ThrottlingExceptionRoutePolicy; +import org.apache.camel.throttling.ThrottlingExceptionHalfOpenHandler; +import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy; import org.apache.camel.support.ServiceSupport; import org.junit.Before; import org.junit.Test; diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java index d3364e6..935a388 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingInflightRoutePolicyTest.java @@ -18,7 +18,7 @@ package org.apache.camel.processor; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.impl.ThrottlingInflightRoutePolicy; +import org.apache.camel.throttling.ThrottlingInflightRoutePolicy; import org.junit.Test; public class ThrottlingInflightRoutePolicyTest extends ContextTestSupport { diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java index d72e7ab..63afcea 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java @@ -59,6 +59,11 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe this.processor = AsyncProcessorConverterHelper.convert(processor); } + @Override + public AsyncProcessor getProcessor() { + return processor; + } + public ExceptionHandler getExceptionHandler() { if (exceptionHandler == null) { exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsThrottlingInflightRoutePolicyTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsThrottlingInflightRoutePolicyTest.java index 0d01804..6b8e2de 100644 --- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsThrottlingInflightRoutePolicyTest.java +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsThrottlingInflightRoutePolicyTest.java @@ -21,7 +21,7 @@ import javax.jms.ConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.ThrottlingInflightRoutePolicy; +import org.apache.camel.throttling.ThrottlingInflightRoutePolicy; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; diff --git a/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JMSTransactionThrottlingRoutePolicyTest.xml b/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JMSTransactionThrottlingRoutePolicyTest.xml index d0dac16..ba982c5 100644 --- a/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JMSTransactionThrottlingRoutePolicyTest.xml +++ b/components/camel-jms/src/test/resources/org/apache/camel/component/jms/tx/JMSTransactionThrottlingRoutePolicyTest.xml @@ -49,7 +49,7 @@ <property name="transactionManager" ref="jmsTransactionManager"/> </bean> - <bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy"> + <bean id="myPolicy" class="org.apache.camel.throttling.ThrottlingInflightRoutePolicy"> <property name="maxInflightExchanges" value="16"/> <property name="resumePercentOfMax" value="25"/> </bean> diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java index a877b62..8f31a07 100644 --- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java +++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterComponent.java @@ -23,7 +23,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.cluster.CamelClusterService; import org.apache.camel.support.DefaultComponent; import org.apache.camel.impl.cluster.ClusterServiceHelper; -import org.apache.camel.impl.cluster.ClusterServiceSelectors; +import org.apache.camel.cluster.ClusterServiceSelectors; import org.apache.camel.spi.Metadata; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StringHelper; diff --git a/components/camel-quartz2/src/test/java/org/apache/camel/routepolicy/quartz2/MultiplePoliciesOnRouteTest.java b/components/camel-quartz2/src/test/java/org/apache/camel/routepolicy/quartz2/MultiplePoliciesOnRouteTest.java index 21894ce..56294ac 100644 --- a/components/camel-quartz2/src/test/java/org/apache/camel/routepolicy/quartz2/MultiplePoliciesOnRouteTest.java +++ b/components/camel-quartz2/src/test/java/org/apache/camel/routepolicy/quartz2/MultiplePoliciesOnRouteTest.java @@ -23,7 +23,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.quartz2.QuartzComponent; import org.apache.camel.impl.JndiRegistry; -import org.apache.camel.impl.ThrottlingInflightRoutePolicy; +import org.apache.camel.throttling.ThrottlingInflightRoutePolicy; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; diff --git a/components/camel-quartz2/src/test/resources/org/apache/camel/routepolicy/quartz2/MultiplePolicies.xml b/components/camel-quartz2/src/test/resources/org/apache/camel/routepolicy/quartz2/MultiplePolicies.xml index cbb9462..9792fbe 100644 --- a/components/camel-quartz2/src/test/resources/org/apache/camel/routepolicy/quartz2/MultiplePolicies.xml +++ b/components/camel-quartz2/src/test/resources/org/apache/camel/routepolicy/quartz2/MultiplePolicies.xml @@ -34,7 +34,7 @@ <property name="routeStartRepeatInterval" value="3000"/> </bean> - <bean id="throttlePolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy"> + <bean id="throttlePolicy" class="org.apache.camel.throttling.ThrottlingInflightRoutePolicy"> <property name="maxInflightExchanges" value="10"/> </bean> diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java index 728314c..2e95e92 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BackpressurePublisherRoutePolicyTest.java @@ -26,7 +26,7 @@ import org.apache.camel.StatefulService; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams; import org.apache.camel.component.reactive.streams.support.TestSubscriber; -import org.apache.camel.impl.ThrottlingInflightRoutePolicy; +import org.apache.camel.throttling.ThrottlingInflightRoutePolicy; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; import org.reactivestreams.Publisher; diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlingInflightRoutePolicyTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlingInflightRoutePolicyTest.xml index cdb10c6..402baff 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlingInflightRoutePolicyTest.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlingInflightRoutePolicyTest.xml @@ -26,7 +26,7 @@ <!-- START SNIPPET: e1 --> <!-- configure our route policy to throttling based --> - <bean id="myRoutePolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy"> + <bean id="myRoutePolicy" class="org.apache.camel.throttling.ThrottlingInflightRoutePolicy"> <!-- we want at most 10 concurrent inflight exchanges --> <property name="maxInflightExchanges" value="10"/> <!-- and we want a low water mark value of 20% of the max which means that diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java index 07da23d..122c002 100644 --- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java +++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java @@ -29,7 +29,7 @@ import org.apache.camel.support.ServiceHelper; /** * Connector {@link Producer} which is capable of performing before and after custom processing - * via the {@link Pipeline }while processing (ie sending the message). + * via the {@link Pipeline} while processing (ie sending the message). */ public class ConnectorProducer extends DefaultAsyncProducer { diff --git a/examples/camel-example-route-throttling/src/main/resources/META-INF/spring/camel-server.xml b/examples/camel-example-route-throttling/src/main/resources/META-INF/spring/camel-server.xml index 5f1d890..34f08d7 100644 --- a/examples/camel-example-route-throttling/src/main/resources/META-INF/spring/camel-server.xml +++ b/examples/camel-example-route-throttling/src/main/resources/META-INF/spring/camel-server.xml @@ -42,7 +42,7 @@ </bean> <!-- START SNIPPET: e1 --> - <bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy"> + <bean id="myPolicy" class="org.apache.camel.throttling.ThrottlingInflightRoutePolicy"> <!-- define the scope to be context scoped so we measure against total inflight exchanges that means for both route1, route2 and route3 all together --> <property name="scope" value="Context"/>
