This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 292d999766c CAMEL-22154: camel-core - BackOffTask make it possible to
manage and observe (#18430)
292d999766c is described below
commit 292d999766c9dfe53cfbed762f31886f19647490
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Jun 23 10:00:13 2025 +0200
CAMEL-22154: camel-core - BackOffTask make it possible to manage and
observe (#18430)
* CAMEL-22154: camel-core - BackOffTask make it possible to manage and
observe
---
.../apache/camel/catalog/dev-consoles.properties | 1 +
.../apache/camel/catalog/dev-consoles/backoff.json | 15 ++
.../camel/component/master/MasterConsumer.java | 16 +-
.../camel/component/pgevent/PgEventConsumer.java | 18 +-
.../consumer/SimpleMessageListenerContainer.java | 13 +-
.../org/apache/camel/spi/BackOffTimerFactory.java | 53 ++++++
.../camel/support/service/ServiceHelper.java | 1 +
.../camel/impl/engine/AbstractCamelContext.java | 4 +
.../camel/impl/engine/DefaultBackOffTimer.java | 94 ++++++++++
.../impl/engine/DefaultBackOffTimerFactory.java | 47 +++++
.../engine/DefaultSupervisingRouteController.java | 27 +--
.../camel/impl/engine/SimpleCamelContext.java | 6 +
.../org/apache/camel/dev-console/backoff.json | 15 ++
.../services/org/apache/camel/dev-console/backoff | 2 +
.../org/apache/camel/dev-consoles.properties | 2 +-
.../camel/impl/console/BackOffDevConsole.java | 93 ++++++++++
.../camel/impl/console/RouteControllerConsole.java | 2 +-
.../api/management/mbean/CamelOpenMBeanTypes.java | 19 ++
.../management/mbean/ManagedBackoffTimerMBean.java | 35 ++++
.../management/JmxManagementLifecycleStrategy.java | 4 +
.../management/mbean/ManagedBackoffTimer.java | 85 +++++++++
.../camel/management/ManagedBackOffTimerTest.java | 113 ++++++++++++
.../org/apache/camel/support/PluginHelper.java | 9 +
.../main/java/org/apache/camel/util/TimeUtils.java | 31 +++-
.../org/apache/camel/util/backoff/BackOff.java | 29 ++-
.../apache/camel/util/backoff/BackOffTimer.java | 61 ++++---
.../camel/util/backoff/BackOffTimerTask.java | 40 ++++-
.../camel/util/backoff/SimpleBackOffTimer.java | 96 ++++++++++
.../org/apache/camel/util/backoff/BackOffTest.java | 8 +-
...fTimerTest.java => SimpleBackOffTimerTest.java} | 74 +++++++-
.../ROOT/pages/camel-4x-upgrade-guide-4_13.adoc | 15 ++
.../camel/cli/connector/LocalCliConnector.java | 7 +
.../dsl/jbang/core/commands/CamelJBangMain.java | 1 +
.../jbang/core/commands/process/ListBackOff.java | 195 +++++++++++++++++++++
.../DependencyDownloaderPropertiesComponent.java | 3 +-
35 files changed, 1161 insertions(+), 73 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
index e33c068a992..37744abd89c 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
@@ -1,6 +1,7 @@
aws-secrets
aws2-s3
azure-secrets
+backoff
bean
bean-model
blocked
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/backoff.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/backoff.json
new file mode 100644
index 00000000000..8cae12a4e92
--- /dev/null
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/backoff.json
@@ -0,0 +1,15 @@
+{
+ "console": {
+ "kind": "console",
+ "group": "camel",
+ "name": "backoff",
+ "title": "BackOff",
+ "description": "Display information about BackOff tasks",
+ "deprecated": false,
+ "javaType": "org.apache.camel.impl.console.BackOffDevConsole",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-console",
+ "version": "4.13.0-SNAPSHOT"
+ }
+}
+
diff --git
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index ded0576fe2b..652c82e2865 100644
---
a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++
b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -31,6 +31,7 @@ import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.resume.AdapterHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.backoff.BackOff;
@@ -53,6 +54,7 @@ public class MasterConsumer extends DefaultConsumer
implements ResumeAware<Resum
private volatile Consumer delegatedConsumer;
private volatile CamelClusterView view;
private ResumeStrategy resumeStrategy;
+ private BackOffTimer timer;
public MasterConsumer(MasterEndpoint masterEndpoint, Processor processor,
CamelClusterService clusterService) {
super(masterEndpoint, processor);
@@ -74,10 +76,19 @@ public class MasterConsumer extends DefaultConsumer
implements ResumeAware<Resum
this.resumeStrategy = resumeStrategy;
}
+ @Override
+ protected void doInit() throws Exception {
+ super.doInit();
+ this.timer =
PluginHelper.getBackOffTimerFactory(masterEndpoint.getCamelContext().getCamelContextExtension())
+ .newBackOffTimer("MasterConsumer",
masterEndpoint.getComponent().getBackOffThreadPool());
+ }
+
@Override
protected void doStart() throws Exception {
super.doStart();
+ ServiceHelper.startService(timer);
+
LOG.debug("Using ClusterService instance {} (id={}, type={})",
clusterService, clusterService.getId(),
clusterService.getClass().getName());
@@ -92,12 +103,10 @@ public class MasterConsumer extends DefaultConsumer
implements ResumeAware<Resum
if (view != null) {
view.removeEventListener(leadershipListener);
clusterService.releaseView(view);
-
view = null;
}
- ServiceHelper.stopAndShutdownServices(delegatedConsumer,
delegatedEndpoint);
-
+ ServiceHelper.stopAndShutdownServices(delegatedConsumer,
delegatedEndpoint, timer);
delegatedConsumer = null;
}
@@ -141,7 +150,6 @@ public class MasterConsumer extends DefaultConsumer
implements ResumeAware<Resum
long delay = masterEndpoint.getComponent().getBackOffDelay();
long max = masterEndpoint.getComponent().getBackOffMaxAttempts();
- BackOffTimer timer = new
BackOffTimer(masterEndpoint.getComponent().getBackOffThreadPool());
timer.schedule(BackOff.builder().delay(delay).maxAttempts(max).build(), task ->
{
LOG.info("Leadership taken. Attempt #{} to start consumer:
{}", task.getCurrentAttempts(),
delegatedEndpoint);
diff --git
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
index 2b0509fb0b4..e5d52ef9db1 100644
---
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
+++
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
@@ -26,6 +26,8 @@ import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.PluginHelper;
+import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.backoff.BackOff;
import org.apache.camel.util.backoff.BackOffTimer;
import org.slf4j.Logger;
@@ -56,8 +58,7 @@ public class PgEventConsumer extends DefaultConsumer {
}
@Override
- protected void doStart() throws Exception {
- super.doStart();
+ protected void doInit() throws Exception {
if (endpoint.getWorkerPool() != null) {
workerPool = endpoint.getWorkerPool();
} else {
@@ -67,8 +68,15 @@ public class PgEventConsumer extends DefaultConsumer {
// used for re-connecting to the database
reconnectPool =
getEndpoint().getCamelContext().getExecutorServiceManager()
.newSingleThreadScheduledExecutor(this, "Reconnector");
- timer = new BackOffTimer(reconnectPool);
+ timer =
PluginHelper.getBackOffTimerFactory(endpoint.getCamelContext().getCamelContextExtension())
+ .newBackOffTimer("PgEventConsumer", reconnectPool);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ServiceHelper.startService(timer);
listener.initConnection();
+ super.doStart();
}
@Override
@@ -76,12 +84,12 @@ public class PgEventConsumer extends DefaultConsumer {
super.doStop();
listener.closeConnection();
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(reconnectPool);
- timer = null;
if (shutdownWorkerPool && workerPool != null) {
LOG.debug("Shutting down PgEventConsumer worker threads with
timeout {} millis", 10000);
endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(workerPool,
10000);
workerPool = null;
}
+ ServiceHelper.stopService(timer);
}
public class PgEventListener implements PGNotificationListener {
@@ -89,7 +97,7 @@ public class PgEventConsumer extends DefaultConsumer {
public void reconnect() {
BackOff bo =
BackOff.builder().delay(endpoint.getReconnectDelay()).build();
timer.schedule(bo, t -> {
- LOG.debug("Connecting attempt #" + t.getCurrentAttempts());
+ LOG.debug("Connecting attempt #{}", t.getCurrentAttempts());
try {
initConnection();
} catch (Exception e) {
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
index cdc902c8998..bcf9c56fc91 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java
@@ -35,6 +35,8 @@ import org.apache.camel.CamelContext;
import org.apache.camel.component.sjms.SessionMessageListener;
import org.apache.camel.component.sjms.SjmsEndpoint;
import org.apache.camel.component.sjms.jms.DestinationCreationStrategy;
+import org.apache.camel.support.PluginHelper;
+import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.backoff.BackOff;
import org.apache.camel.util.backoff.BackOffTimer;
@@ -63,6 +65,7 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
private Set<MessageConsumer> consumers;
private Set<Session> sessions;
private BackOffTimer.Task recoverTask;
+ private BackOffTimer timer;
private ScheduledExecutorService scheduler;
public SimpleMessageListenerContainer(SjmsEndpoint endpoint) {
@@ -219,7 +222,13 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
// we need to recover using a background task
if (recoverTask == null || recoverTask.getStatus() !=
BackOffTimer.Task.Status.Active) {
BackOff backOff =
BackOff.builder().delay(endpoint.getRecoveryInterval()).build();
- recoverTask = new BackOffTimer(scheduler).schedule(backOff,
this::recoverConnection);
+ if (timer == null) {
+ timer =
PluginHelper.getBackOffTimerFactory(endpoint.getCamelContext().getCamelContextExtension())
+ .newBackOffTimer("SjmsConnectionRecovery",
+ scheduler);
+ ServiceHelper.startService(timer);
+ }
+ recoverTask = timer.schedule(backOff, this::recoverConnection);
}
}
@@ -242,6 +251,8 @@ public class SimpleMessageListenerContainer extends
ServiceSupport
endpoint.getCamelContext().getExecutorServiceManager().shutdown(scheduler);
scheduler = null;
}
+ ServiceHelper.stopService(timer);
+ timer = null;
}
@Override
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/BackOffTimerFactory.java
b/core/camel-api/src/main/java/org/apache/camel/spi/BackOffTimerFactory.java
new file mode 100644
index 00000000000..497f084582a
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/BackOffTimerFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.util.backoff.BackOffTimer;
+
+/**
+ * Factory for creating {@link BackOffTimer}.
+ *
+ * @see org.apache.camel.util.backoff.BackOff
+ */
+public interface BackOffTimerFactory {
+
+ /**
+ * Creates a new {@link BackOffTimer}.
+ *
+ * Important: The timer should be started and stopped to control its
lifecycle by using
+ * {@link org.apache.camel.support.service.ServiceHelper}.
+ *
+ * @param name logical name of the timer
+ * @return new empty backoff timer
+ */
+ BackOffTimer newBackOffTimer(String name);
+
+ /**
+ * Creates a new {@link BackOffTimer} using the given executor service.
+ *
+ * Important: The timer should be started and stopped to control its
lifecycle by using
+ * {@link org.apache.camel.support.service.ServiceHelper}.
+ *
+ * @param name logical name of the timer
+ * @param scheduler thread pool to use for running tasks
+ * @return new empty backoff timer
+ */
+ BackOffTimer newBackOffTimer(String name, ScheduledExecutorService
scheduler);
+
+}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java
b/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java
index 1c464db6af9..2c613f4a882 100644
---
a/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java
+++
b/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
* A collection of helper methods for working with {@link Service} objects.
*/
public final class ServiceHelper {
+
private static final Logger LOG =
LoggerFactory.getLogger(ServiceHelper.class);
/**
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 0fddc3ac0ac..e1b7d794251 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -87,6 +87,7 @@ import org.apache.camel.impl.debugger.DefaultBacklogDebugger;
import org.apache.camel.spi.AnnotationBasedProcessorFactory;
import org.apache.camel.spi.AnnotationScanTypeConverters;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.BackOffTimerFactory;
import org.apache.camel.spi.BacklogDebugger;
import org.apache.camel.spi.BeanIntrospection;
import org.apache.camel.spi.BeanProcessorFactory;
@@ -384,6 +385,7 @@ public abstract class AbstractCamelContext extends
BaseService
camelContextExtension.lazyAddContextPlugin(AnnotationBasedProcessorFactory.class,
this::createAnnotationBasedProcessorFactory);
camelContextExtension.lazyAddContextPlugin(DumpRoutesStrategy.class,
this::createDumpRoutesStrategy);
+ camelContextExtension.lazyAddContextPlugin(BackOffTimerFactory.class,
this::createBackOffTimerFactory);
}
protected static <T> T lookup(CamelContext context, String ref, Class<T>
type) {
@@ -4342,6 +4344,8 @@ public abstract class AbstractCamelContext extends
BaseService
protected abstract StartupConditionStrategy
createStartupConditionStrategy();
+ protected abstract BackOffTimerFactory createBackOffTimerFactory();
+
protected RestConfiguration createRestConfiguration() {
// lookup a global which may have been on a container such spring-boot
/ CDI / etc.
RestConfiguration conf
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultBackOffTimer.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultBackOffTimer.java
new file mode 100644
index 00000000000..4b43296c65e
--- /dev/null
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultBackOffTimer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.backoff.BackOff;
+import org.apache.camel.util.backoff.BackOffTimer;
+import org.apache.camel.util.backoff.BackOffTimerTask;
+import org.apache.camel.util.function.ThrowingFunction;
+
+/**
+ * Default {@link BackOffTimer}.
+ */
+public class DefaultBackOffTimer extends ServiceSupport implements
BackOffTimer {
+
+ private final CamelContext camelContext;
+ private final ScheduledExecutorService scheduler;
+ private final String name;
+ private final Set<Task> tasks = new CopyOnWriteArraySet<>();
+
+ public DefaultBackOffTimer(CamelContext camelContext, String name,
ScheduledExecutorService scheduler) {
+ this.camelContext = camelContext;
+ this.scheduler = scheduler;
+ this.name = name;
+ }
+
+ @Override
+ public Task schedule(BackOff backOff, ThrowingFunction<Task, Boolean,
Exception> function) {
+ final BackOffTimerTask task = new BackOffTimerTask(this, backOff,
scheduler, function);
+
+ long delay = task.next();
+ if (delay != BackOff.NEVER) {
+ tasks.add(task);
+ scheduler.schedule(task, delay, TimeUnit.MILLISECONDS);
+ } else {
+ task.cancel();
+ }
+
+ return task;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void remove(Task task) {
+ tasks.remove(task);
+ }
+
+ @Override
+ public Set<Task> getTasks() {
+ return Collections.unmodifiableSet(tasks);
+ }
+
+ @Override
+ public int size() {
+ return tasks.size();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ camelContext.addService(this);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ tasks.clear();
+ camelContext.removeService(this);
+ }
+
+}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultBackOffTimerFactory.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultBackOffTimerFactory.java
new file mode 100644
index 00000000000..005bd6dbaa1
--- /dev/null
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultBackOffTimerFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.BackOffTimerFactory;
+import org.apache.camel.util.backoff.BackOffTimer;
+
+/**
+ * Default {@link BackOffTimerFactory}.
+ */
+public class DefaultBackOffTimerFactory implements BackOffTimerFactory {
+
+ private final CamelContext camelContext;
+
+ public DefaultBackOffTimerFactory(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public BackOffTimer newBackOffTimer(String name) {
+ var scheduler =
camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
"BackOffTimer");
+ return newBackOffTimer(name, scheduler);
+ }
+
+ @Override
+ public BackOffTimer newBackOffTimer(String name, ScheduledExecutorService
scheduler) {
+ return new DefaultBackOffTimer(camelContext, name, scheduler);
+ }
+
+}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
index bcaaa898d1b..0b7a8c8df59 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
@@ -52,7 +52,9 @@ import org.apache.camel.spi.RoutePolicyFactory;
import org.apache.camel.spi.SupervisingRouteController;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.PatternHelper;
+import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.RoutePolicySupport;
+import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.apache.camel.util.backoff.BackOff;
@@ -231,14 +233,8 @@ public class DefaultSupervisingRouteController extends
DefaultRouteController im
@Override
protected void doStart() throws Exception {
- this.backOff = new BackOff(
- Duration.ofMillis(backOffDelay),
- backOffMaxDelay > 0 ? Duration.ofMillis(backOffMaxDelay) :
null,
- backOffMaxElapsedTime > 0 ?
Duration.ofMillis(backOffMaxElapsedTime) : null,
- backOffMaxAttempts > 0 ? backOffMaxAttempts : Long.MAX_VALUE,
- backOffMultiplier);
-
CamelContext context = getCamelContext();
+
if (threadPoolSize == 1) {
executorService
=
context.getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
"SupervisingRouteController");
@@ -246,16 +242,25 @@ public class DefaultSupervisingRouteController extends
DefaultRouteController im
executorService =
context.getExecutorServiceManager().newScheduledThreadPool(this,
"SupervisingRouteController",
threadPoolSize);
}
- timer = new BackOffTimer(executorService);
+ backOff = new BackOff(
+ Duration.ofMillis(backOffDelay),
+ backOffMaxDelay > 0 ? Duration.ofMillis(backOffMaxDelay) :
null,
+ backOffMaxElapsedTime > 0 ?
Duration.ofMillis(backOffMaxElapsedTime) : null,
+ backOffMaxAttempts > 0 ? backOffMaxAttempts : Long.MAX_VALUE,
+ backOffMultiplier, false);
+
+ timer =
PluginHelper.getBackOffTimerFactory(context.getCamelContextExtension())
+ .newBackOffTimer("SupervisingRouteController",
executorService);
+ ServiceHelper.startService(timer);
}
@Override
protected void doStop() throws Exception {
- if (getCamelContext() != null && executorService != null) {
+ if (getCamelContext() != null) {
getCamelContext().getExecutorServiceManager().shutdown(executorService);
- executorService = null;
- timer = null;
}
+ executorService = null;
+ ServiceHelper.stopService(timer);
}
// *********************************
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index e964214d1fb..715557ebbc9 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -34,6 +34,7 @@ import org.apache.camel.health.HealthCheckResolver;
import org.apache.camel.impl.converter.DefaultTypeConverter;
import org.apache.camel.spi.AnnotationBasedProcessorFactory;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.BackOffTimerFactory;
import org.apache.camel.spi.BeanIntrospection;
import org.apache.camel.spi.BeanProcessorFactory;
import org.apache.camel.spi.BeanProxyFactory;
@@ -740,6 +741,11 @@ public class SimpleCamelContext extends
AbstractCamelContext {
return new DefaultStartupConditionStrategy();
}
+ @Override
+ protected BackOffTimerFactory createBackOffTimerFactory() {
+ return new DefaultBackOffTimerFactory(this);
+ }
+
@Override
protected TransformerRegistry createTransformerRegistry() {
return new DefaultTransformerRegistry(getCamelContextReference());
diff --git
a/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/backoff.json
b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/backoff.json
new file mode 100644
index 00000000000..8cae12a4e92
--- /dev/null
+++
b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/backoff.json
@@ -0,0 +1,15 @@
+{
+ "console": {
+ "kind": "console",
+ "group": "camel",
+ "name": "backoff",
+ "title": "BackOff",
+ "description": "Display information about BackOff tasks",
+ "deprecated": false,
+ "javaType": "org.apache.camel.impl.console.BackOffDevConsole",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-console",
+ "version": "4.13.0-SNAPSHOT"
+ }
+}
+
diff --git
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/backoff
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/backoff
new file mode 100644
index 00000000000..88b4c5ca27b
--- /dev/null
+++
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/backoff
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.impl.console.BackOffDevConsole
diff --git
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
index cef4ebaffbb..1ff8974c70e 100644
---
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
+++
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
@@ -1,5 +1,5 @@
# Generated by camel build tools - do NOT edit this file!
-dev-consoles=bean blocked browse circuit-breaker consumer context debug
endpoint event gc health inflight java-security jvm log memory properties
receive reload rest route route-controller route-dump send service source
startup-recorder system-properties thread top trace transformers
type-converters variables
+dev-consoles=backoff bean blocked browse circuit-breaker consumer context
debug endpoint event gc health inflight java-security jvm log memory properties
receive reload rest route route-controller route-dump send service source
startup-recorder system-properties thread top trace transformers
type-converters variables
groupId=org.apache.camel
artifactId=camel-console
version=4.13.0-SNAPSHOT
diff --git
a/core/camel-console/src/main/java/org/apache/camel/impl/console/BackOffDevConsole.java
b/core/camel-console/src/main/java/org/apache/camel/impl/console/BackOffDevConsole.java
new file mode 100644
index 00000000000..4b8f03f3e26
--- /dev/null
+++
b/core/camel-console/src/main/java/org/apache/camel/impl/console/BackOffDevConsole.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.console;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.camel.spi.annotations.DevConsole;
+import org.apache.camel.support.console.AbstractDevConsole;
+import org.apache.camel.util.backoff.BackOffTimer;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+
+@DevConsole(name = "backoff", displayName = "BackOff", description = "Display
information about BackOff tasks")
+public class BackOffDevConsole extends AbstractDevConsole {
+
+ public BackOffDevConsole() {
+ super("camel", "backoff", "BackOff", "Display information about
BackOff tasks");
+ }
+
+ @Override
+ protected String doCallText(Map<String, Object> options) {
+ StringBuilder sb = new StringBuilder();
+
+ Set<BackOffTimer> timers =
getCamelContext().hasServices(BackOffTimer.class);
+ for (BackOffTimer timer : timers) {
+ sb.append(String.format("\nTimer: %s", timer.getName()));
+ sb.append(String.format("\nTasks: %s", timer.size()));
+ int id = 0;
+ for (BackOffTimer.Task task : timer.getTasks()) {
+ String failure = task.getException() != null ?
task.getException().getMessage() : "";
+ sb.append(String.format(
+ "\n #%d (name=%s status=%s attempts=%d delay=%d
elapsed=%d first=%d last=%d next=%d failure=%s",
+ id, task.getName(), task.getStatus().name(),
task.getCurrentAttempts(), task.getCurrentDelay(),
+ task.getCurrentElapsedTime(),
task.getFirstAttemptTime(), task.getLastAttemptTime(),
+ task.getNextAttemptTime(), failure));
+ id++;
+ }
+ }
+ return sb.toString();
+ }
+
+ @Override
+ protected JsonObject doCallJson(Map<String, Object> options) {
+ JsonObject root = new JsonObject();
+ JsonArray arr = new JsonArray();
+ root.put("timers", arr);
+
+ Set<BackOffTimer> timers =
getCamelContext().hasServices(BackOffTimer.class);
+ for (BackOffTimer timer : timers) {
+ JsonObject jo = new JsonObject();
+ jo.put("name", timer.getName());
+ jo.put("size", timer.size());
+ arr.add(jo);
+ if (timer.size() > 0) {
+ JsonArray arr2 = new JsonArray();
+ jo.put("tasks", arr2);
+ for (BackOffTimer.Task task : timer.getTasks()) {
+ String failure = task.getException() != null ?
task.getException().getMessage() : "";
+ JsonObject jo2 = new JsonObject();
+ jo2.put("name", task.getName());
+ jo2.put("status", task.getStatus().name());
+ jo2.put("attempts", task.getCurrentAttempts());
+ jo2.put("delay", task.getCurrentDelay());
+ jo2.put("elapsed", task.getCurrentElapsedTime());
+ jo2.put("firstTime", task.getFirstAttemptTime());
+ jo2.put("lastTime", task.getLastAttemptTime());
+ jo2.put("nextTime", task.getNextAttemptTime());
+ if (failure != null) {
+ jo2.put("error", failure);
+ }
+ arr2.add(jo2);
+ }
+ }
+ }
+ return root;
+ }
+
+}
diff --git
a/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteControllerConsole.java
b/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteControllerConsole.java
index 6555d865ca4..2dc6167ee8c 100644
---
a/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteControllerConsole.java
+++
b/core/camel-console/src/main/java/org/apache/camel/impl/console/RouteControllerConsole.java
@@ -119,7 +119,7 @@ public class RouteControllerConsole extends
AbstractDevConsole {
sb.append(String.format("\n %s %s (%s) ", status,
routeId, uri));
sb.append(String.format("\n Supervising: %s",
supervising));
sb.append(String.format("\n Attempts: %s",
attempts));
- sb.append(String.format("\n Last Ago: %s",
last));
+ sb.append(String.format("\n Last: %s", last));
sb.append(String.format("\n Next Attempt: %s",
next));
sb.append(String.format("\n Elapsed: %s",
elapsed));
if (error != null) {
diff --git
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
index 6a9fde81336..bdc658ca333 100644
---
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
+++
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
@@ -327,4 +327,23 @@ public final class CamelOpenMBeanTypes {
return new TabularType("variables", "Variables", ct, new String[] {
"id", "key" });
}
+ public static TabularType listBackoffTTaskTabularType() throws
OpenDataException {
+ CompositeType ct = listBackoffTaskCompositeType();
+ return new TabularType(
+ "listBackoff", "Lists all the backoff tasks", ct,
+ new String[] { "name" });
+ }
+
+ public static CompositeType listBackoffTaskCompositeType() throws
OpenDataException {
+ return new CompositeType(
+ "tasks", "Tasks",
+ new String[] {
+ "name", "status", "attempts", "delay", "elapsed",
"firstTime", "lastTime", "nextTime", "failure" },
+ new String[] {
+ "Name", "Status", "Attempts", "Delay", "Elapsed",
"FirstTime", "LastTime", "NextTime", "Failure" },
+ new OpenType[] {
+ SimpleType.STRING, SimpleType.STRING, SimpleType.LONG,
SimpleType.LONG, SimpleType.LONG,
+ SimpleType.LONG, SimpleType.LONG, SimpleType.LONG,
SimpleType.STRING });
+ }
+
}
diff --git
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBackoffTimerMBean.java
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBackoffTimerMBean.java
new file mode 100644
index 00000000000..0d0812a08ba
--- /dev/null
+++
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBackoffTimerMBean.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+
+public interface ManagedBackoffTimerMBean {
+
+ @ManagedAttribute(description = "Name of the backoff timer")
+ String getName();
+
+ @ManagedAttribute(description = "Number of total tasks")
+ Integer getSize();
+
+ @ManagedOperation(description = "Lists all the tasks")
+ TabularData listTasks();
+
+}
diff --git
a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
index 0f058a2e009..9661bb1dfd6 100644
---
a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
+++
b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java
@@ -54,6 +54,7 @@ import org.apache.camel.impl.debugger.DefaultBacklogDebugger;
import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager;
import org.apache.camel.management.mbean.ManagedBacklogDebugger;
import org.apache.camel.management.mbean.ManagedBacklogTracer;
+import org.apache.camel.management.mbean.ManagedBackoffTimer;
import org.apache.camel.management.mbean.ManagedBeanIntrospection;
import org.apache.camel.management.mbean.ManagedCamelContext;
import org.apache.camel.management.mbean.ManagedConsumerCache;
@@ -118,6 +119,7 @@ import
org.apache.camel.throttling.ThrottlingExceptionRoutePolicy;
import org.apache.camel.throttling.ThrottlingInflightRoutePolicy;
import org.apache.camel.util.KeyValueHolder;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.backoff.BackOffTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -577,6 +579,8 @@ public class JmxManagementLifecycleStrategy extends
ServiceSupport implements Li
answer = new ManagedValidatorRegistry(context, validatorRegistry);
} else if (service instanceof BrowsableVariableRepository
variableRepository) {
answer = new ManagedVariableRepository(context,
variableRepository);
+ } else if (service instanceof BackOffTimer timer) {
+ answer = new ManagedBackoffTimer(camelContext, timer);
} else if (service instanceof CamelClusterService camelClusterService)
{
answer =
getManagementObjectStrategy().getManagedObjectForClusterService(context,
camelClusterService);
} else if (service != null) {
diff --git
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBackoffTimer.java
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBackoffTimer.java
new file mode 100644
index 00000000000..8e2f35753f9
--- /dev/null
+++
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBackoffTimer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.Set;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.Service;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
+import org.apache.camel.api.management.mbean.ManagedBackoffTimerMBean;
+import org.apache.camel.util.backoff.BackOffTimer;
+
+@ManagedResource(description = "Managed BackoffTimer")
+public class ManagedBackoffTimer extends ManagedService implements
ManagedBackoffTimerMBean {
+
+ private final BackOffTimer timer;
+
+ public ManagedBackoffTimer(CamelContext context, BackOffTimer timer) {
+ super(context, (Service) timer);
+ this.timer = timer;
+ }
+
+ @Override
+ public String getName() {
+ return timer.getName();
+ }
+
+ @Override
+ public Integer getSize() {
+ return timer.size();
+ }
+
+ @Override
+ public TabularData listTasks() {
+ try {
+ TabularData answer = new
TabularDataSupport(CamelOpenMBeanTypes.listBackoffTTaskTabularType());
+ Set<BackOffTimer.Task> tasks = timer.getTasks();
+ for (BackOffTimer.Task task : tasks) {
+ String name = task.getName();
+ String status = task.getStatus().name();
+ long attempts = task.getCurrentAttempts();
+ long delay = task.getCurrentDelay();
+ long elapsed = task.getCurrentElapsedTime();
+ long firstTime = task.getFirstAttemptTime();
+ long lastTime = task.getLastAttemptTime();
+ long nextTime = task.getNextAttemptTime();
+ String failure = task.getException() != null ?
task.getException().getMessage() : null;
+ CompositeType ct =
CamelOpenMBeanTypes.listBackoffTaskCompositeType();
+ CompositeData data = new CompositeDataSupport(
+ ct,
+ new String[] {
+ "name", "status", "attempts", "delay",
"elapsed", "firstTime", "lastTime", "nextTime",
+ "failure" },
+ new Object[] { name, status, attempts, delay, elapsed,
firstTime, lastTime, nextTime, failure });
+ answer.put(data);
+ }
+ return answer;
+ } catch (Exception e) {
+ throw RuntimeCamelException.wrapRuntimeCamelException(e);
+ }
+ }
+
+}
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedBackOffTimerTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedBackOffTimerTest.java
new file mode 100644
index 00000000000..0aae5ca2d9c
--- /dev/null
+++
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedBackOffTimerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.support.PluginHelper;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.backoff.BackOff;
+import org.apache.camel.util.backoff.BackOffTimer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@DisabledOnOs(OS.AIX)
+public class ManagedBackOffTimerTest extends ManagementTestSupport {
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ @Test
+ public void testManageBackOffTimer() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger counter = new AtomicInteger();
+
+ BackOffTimer timer =
PluginHelper.getBackOffTimerFactory(context.getCamelContextExtension())
+ .newBackOffTimer("Cheese");
+ ServiceHelper.startService(timer);
+
+ context.start();
+
+ // get the bean introspection for the route
+ MBeanServer mbeanServer = getMBeanServer();
+ Set<ObjectName> set = mbeanServer.queryNames(new
ObjectName("*:type=services,*"), null);
+ List<ObjectName> list = new ArrayList<>(set);
+ ObjectName on = null;
+ for (ObjectName name : list) {
+ if (name.getCanonicalName().contains("DefaultBackOffTimer")) {
+ on = name;
+ break;
+ }
+ }
+
+ assertNotNull(on, "Should have found DefaultBackOffTimer");
+
+ String name = (String) mbeanServer.getAttribute(on, "Name");
+ assertEquals("Cheese", name);
+
+ final BackOff backOff =
BackOff.builder().delay(100).removeOnComplete(false).build();
+ final AtomicLong first = new AtomicLong();
+
+ BackOffTimer.Task task = timer.schedule(
+ backOff,
+ context -> {
+ assertEquals(counter.incrementAndGet(),
context.getCurrentAttempts());
+ assertEquals(100, context.getCurrentDelay());
+ assertEquals(100L * counter.get(),
context.getCurrentElapsedTime());
+ if (first.get() == 0) {
+ first.set(context.getFirstAttemptTime());
+ } else {
+ assertEquals(first.get(),
context.getFirstAttemptTime());
+ }
+
+ return counter.get() < 5;
+ });
+
+ task.whenComplete(
+ (context, throwable) -> {
+ assertEquals(5, counter.get());
+ latch.countDown();
+ });
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ Integer size = (Integer) mbeanServer.getAttribute(on, "Size");
+ assertEquals(1, size);
+
+ TabularData data = (TabularData) mbeanServer.invoke(on, "listTasks",
null, null);
+ assertEquals(1, data.size());
+
+ ServiceHelper.stopService(timer);
+ }
+
+}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
index 0abc1187795..2c2c776cc78 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
@@ -25,6 +25,7 @@ import org.apache.camel.console.DevConsoleResolver;
import org.apache.camel.health.HealthCheckResolver;
import org.apache.camel.spi.AnnotationBasedProcessorFactory;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.BackOffTimerFactory;
import org.apache.camel.spi.BeanIntrospection;
import org.apache.camel.spi.BeanProcessorFactory;
import org.apache.camel.spi.BeanProxyFactory;
@@ -575,4 +576,12 @@ public final class PluginHelper {
ExtendedCamelContext extendedCamelContext) {
return
extendedCamelContext.getContextPlugin(AnnotationBasedProcessorFactory.class);
}
+
+ /**
+ * Gets the {@link BackOffTimerFactory} to use.
+ */
+ public static BackOffTimerFactory getBackOffTimerFactory(
+ ExtendedCamelContext extendedCamelContext) {
+ return
extendedCamelContext.getContextPlugin(BackOffTimerFactory.class);
+ }
}
diff --git a/core/camel-util/src/main/java/org/apache/camel/util/TimeUtils.java
b/core/camel-util/src/main/java/org/apache/camel/util/TimeUtils.java
index d689b8c5434..df1d3c4dcfa 100644
--- a/core/camel-util/src/main/java/org/apache/camel/util/TimeUtils.java
+++ b/core/camel-util/src/main/java/org/apache/camel/util/TimeUtils.java
@@ -36,10 +36,10 @@ public final class TimeUtils {
}
/**
- * Prints the since ago in a human-readable format as 9s, 27m44s, 3h12m,
3d8h, as seen on Kubernetes etc.
+ * Prints since age in a human-readable format as 9s, 27m44s, 3h12m, 3d8h,
as seen on Kubernetes etc.
*
* @param time time of the event (millis since epoch)
- * @return ago in human-readable since the given time.
+ * @return age in human-readable since the given time.
*/
public static String printSince(long time) {
long age = System.currentTimeMillis() - time;
@@ -47,15 +47,38 @@ public final class TimeUtils {
}
/**
- * Prints the ago in a human-readable format as 9s, 27m44s, 3h12m, 3d8h,
as seen on Kubernetes etc.
+ * Prints since age in a human-readable format as 9s, 27m44s, 3h12m, 3d8h,
as seen on Kubernetes etc.
+ *
+ * @param time time of the event (millis since epoch)
+ * @param precise whether to be precise and include more details
+ * @return age in human-readable since the given time.
+ */
+ public static String printSince(long time, boolean precise) {
+ long age = System.currentTimeMillis() - time;
+ return printDuration(age, precise);
+ }
+
+ /**
+ * Prints the age in a human-readable format as 9s, 27m44s, 3h12m, 3d8h,
as seen on Kubernetes etc.
*
* @param age age in millis
- * @return ago in human-readable.
+ * @return age in human-readable.
*/
public static String printAge(long age) {
return printDuration(age, false);
}
+ /**
+ * Prints the age in a human-readable format as 9s, 27m44s, 3h12m, 3d8h,
as seen on Kubernetes etc.
+ *
+ * @param age age in millis
+ * @param precise whether to be precise and include more details
+ * @return age in human-readable.
+ */
+ public static String printAge(long age, boolean precise) {
+ return printDuration(age, precise);
+ }
+
/**
* Prints the duration in a human-readable format as 9s, 27m44s, 3h12m,
3d8h, etc.
*
diff --git
a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOff.java
b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOff.java
index edbe4667908..fef421b678f 100644
--- a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOff.java
+++ b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOff.java
@@ -35,17 +35,20 @@ public final class BackOff {
private Duration maxElapsedTime;
private Long maxAttempts;
private Double multiplier;
+ private boolean removeOnComplete;
public BackOff() {
- this(DEFAULT_DELAY, MAX_DURATION, MAX_DURATION, Long.MAX_VALUE,
DEFAULT_MULTIPLIER);
+ this(DEFAULT_DELAY, MAX_DURATION, MAX_DURATION, Long.MAX_VALUE,
DEFAULT_MULTIPLIER, true);
}
- public BackOff(Duration delay, Duration maxDelay, Duration maxElapsedTime,
Long maxAttempts, Double multiplier) {
+ public BackOff(Duration delay, Duration maxDelay, Duration maxElapsedTime,
Long maxAttempts, Double multiplier,
+ boolean removeOnComplete) {
this.delay = ObjectHelper.supplyIfEmpty(delay, () -> DEFAULT_DELAY);
this.maxDelay = ObjectHelper.supplyIfEmpty(maxDelay, () ->
MAX_DURATION);
this.maxElapsedTime = ObjectHelper.supplyIfEmpty(maxElapsedTime, () ->
MAX_DURATION);
this.maxAttempts = ObjectHelper.supplyIfEmpty(maxAttempts, () ->
Long.MAX_VALUE);
this.multiplier = ObjectHelper.supplyIfEmpty(multiplier, () ->
DEFAULT_MULTIPLIER);
+ this.removeOnComplete = removeOnComplete;
}
// *************************************
@@ -109,6 +112,17 @@ public final class BackOff {
this.multiplier = multiplier;
}
+ public boolean isRemoveOnComplete() {
+ return removeOnComplete;
+ }
+
+ /**
+ * Should the task be removed from the timer when its complete
successfully.
+ */
+ public void setRemoveOnComplete(boolean removeOnComplete) {
+ this.removeOnComplete = removeOnComplete;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder(256);
@@ -126,6 +140,7 @@ public final class BackOff {
if (multiplier != DEFAULT_MULTIPLIER) {
sb.append(", multiplier=").append(multiplier);
}
+ sb.append(", remove=").append(removeOnComplete);
sb.append("]");
return sb.toString();
}
@@ -151,6 +166,7 @@ public final class BackOff {
private Duration maxElapsedTime = BackOff.MAX_DURATION;
private Long maxAttempts = Long.MAX_VALUE;
private Double multiplier = BackOff.DEFAULT_MULTIPLIER;
+ private boolean removeOnComplete = true;
/**
* Read values from the given {@link BackOff}
@@ -161,7 +177,7 @@ public final class BackOff {
maxElapsedTime = template.maxElapsedTime;
maxAttempts = template.maxAttempts;
multiplier = template.multiplier;
-
+ removeOnComplete = template.removeOnComplete;
return this;
}
@@ -214,11 +230,16 @@ public final class BackOff {
return this;
}
+ public Builder removeOnComplete(boolean removeOnComplete) {
+ this.removeOnComplete = removeOnComplete;
+ return this;
+ }
+
/**
* Build a new instance of {@link BackOff}
*/
public BackOff build() {
- return new BackOff(delay, maxDelay, maxElapsedTime, maxAttempts,
multiplier);
+ return new BackOff(delay, maxDelay, maxElapsedTime, maxAttempts,
multiplier, removeOnComplete);
}
}
}
diff --git
a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java
b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java
index 6dc1138cdbd..de789ca63f9 100644
---
a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java
+++
b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java
@@ -16,8 +16,7 @@
*/
package org.apache.camel.util.backoff;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
import java.util.function.BiConsumer;
import org.apache.camel.util.function.ThrowingFunction;
@@ -25,40 +24,55 @@ import org.apache.camel.util.function.ThrowingFunction;
/**
* A simple timer utility that use a linked {@link BackOff} to determine when
a task should be executed.
*/
-public class BackOffTimer {
- private final ScheduledExecutorService scheduler;
+public interface BackOffTimer {
- public BackOffTimer(ScheduledExecutorService scheduler) {
- this.scheduler = scheduler;
- }
+ /**
+ * Schedules a task to run according to the backoff settings
+ *
+ * @param backOff the settings for how often to run the task
+ * @param function the function to call for each run
+ * @return the task
+ */
+ Task schedule(BackOff backOff, ThrowingFunction<Task, Boolean, Exception>
function);
/**
- * Schedule the given function/task to be executed some time in the future
according to the given backOff.
+ * Gets the name of this timer.
*/
- public Task schedule(BackOff backOff, ThrowingFunction<Task, Boolean,
Exception> function) {
- final BackOffTimerTask task = new BackOffTimerTask(backOff, scheduler,
function);
-
- long delay = task.next();
- if (delay != BackOff.NEVER) {
- scheduler.schedule(task, delay, TimeUnit.MILLISECONDS);
- } else {
- task.cancel();
- }
+ String getName();
- return task;
- }
+ /**
+ * Removes the task
+ */
+ void remove(Task task);
+
+ /**
+ * Access to unmodifiable set of all the tasks
+ */
+ Set<Task> getTasks();
+
+ /**
+ * Number of tasks
+ */
+ int size();
// ****************************************
// TimerTask
// ****************************************
- public interface Task {
+ interface Task {
enum Status {
Active,
Inactive,
- Exhausted
+ Exhausted,
+ Completed,
+ Failed
}
+ /**
+ * Name of this task
+ */
+ String getName();
+
/**
* The back-off associated with this task.
*/
@@ -99,6 +113,11 @@ public class BackOffTimer {
*/
long getNextAttemptTime();
+ /**
+ * The task failed for some un-expected exception
+ */
+ Throwable getException();
+
/**
* Reset the task.
*/
diff --git
a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimerTask.java
b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimerTask.java
index 0d6e6bc693c..85f30d1f0fc 100644
---
a/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimerTask.java
+++
b/core/camel-util/src/main/java/org/apache/camel/util/backoff/BackOffTimerTask.java
@@ -28,8 +28,9 @@ import java.util.function.BiConsumer;
import org.apache.camel.util.function.ThrowingFunction;
-final class BackOffTimerTask implements BackOffTimer.Task, Runnable {
+public final class BackOffTimerTask implements BackOffTimer.Task, Runnable {
private final Lock lock = new ReentrantLock();
+ private final BackOffTimer timer;
private final BackOff backOff;
private final ScheduledExecutorService scheduler;
private final ThrowingFunction<BackOffTimer.Task, Boolean, Exception>
function;
@@ -43,9 +44,11 @@ final class BackOffTimerTask implements BackOffTimer.Task,
Runnable {
private long currentElapsedTime;
private long lastAttemptTime;
private long nextAttemptTime;
+ private Throwable cause;
- BackOffTimerTask(BackOff backOff, ScheduledExecutorService scheduler,
- ThrowingFunction<BackOffTimer.Task, Boolean, Exception>
function) {
+ public BackOffTimerTask(BackOffTimer timer, BackOff backOff,
ScheduledExecutorService scheduler,
+ ThrowingFunction<BackOffTimer.Task, Boolean,
Exception> function) {
+ this.timer = timer;
this.backOff = backOff;
this.scheduler = scheduler;
this.status = Status.Active;
@@ -66,6 +69,11 @@ final class BackOffTimerTask implements BackOffTimer.Task,
Runnable {
// Properties
// *****************************
+ @Override
+ public String getName() {
+ return timer.getName();
+ }
+
@Override
public BackOff getBackOff() {
return backOff;
@@ -106,15 +114,21 @@ final class BackOffTimerTask implements
BackOffTimer.Task, Runnable {
return nextAttemptTime;
}
+ @Override
+ public Throwable getException() {
+ return cause;
+ }
+
@Override
public void reset() {
this.currentAttempts = 0;
this.currentDelay = 0;
this.currentElapsedTime = 0;
- this.firstAttemptTime = 0;
+ this.firstAttemptTime = BackOff.NEVER;
this.lastAttemptTime = BackOff.NEVER;
this.nextAttemptTime = BackOff.NEVER;
this.status = Status.Active;
+ this.cause = null;
}
@Override
@@ -128,12 +142,20 @@ final class BackOffTimerTask implements
BackOffTimer.Task, Runnable {
// signal task completion on cancel.
complete(null);
+
+ // the task is cancelled and should not be restarted so remove from
timer
+ if (timer != null) {
+ timer.remove(this);
+ }
}
@Override
public void whenComplete(BiConsumer<BackOffTimer.Task, Throwable>
whenCompleted) {
lock.lock();
try {
+ if (backOff.isRemoveOnComplete()) {
+ timer.remove(this);
+ }
consumers.add(whenCompleted);
} finally {
lock.unlock();
@@ -148,7 +170,7 @@ final class BackOffTimerTask implements BackOffTimer.Task,
Runnable {
public void run() {
if (status == Status.Active) {
try {
- lastAttemptTime =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+ lastAttemptTime = System.currentTimeMillis();
if (firstAttemptTime < 0) {
firstAttemptTime = lastAttemptTime;
}
@@ -169,6 +191,7 @@ final class BackOffTimerTask implements BackOffTimer.Task,
Runnable {
} else {
stop();
+ status = Status.Completed;
// if the function return false no more attempts should
// be made so stop the context.
complete(null);
@@ -176,6 +199,7 @@ final class BackOffTimerTask implements BackOffTimer.Task,
Runnable {
} catch (Exception e) {
stop();
+ status = Status.Failed;
complete(e);
}
}
@@ -192,6 +216,7 @@ final class BackOffTimerTask implements BackOffTimer.Task,
Runnable {
}
void complete(Throwable throwable) {
+ this.cause = throwable;
lock.lock();
try {
consumers.forEach(c -> c.accept(this, throwable));
@@ -208,7 +233,7 @@ final class BackOffTimerTask implements BackOffTimer.Task,
Runnable {
* Return the number of milliseconds to wait before retrying the operation
or ${@link BackOff#NEVER} to indicate
* that no further attempt should be made.
*/
- long next() {
+ public long next() {
// A call to next when currentDelay is set to NEVER has no effects
// as this means that either the timer is exhausted or it has explicit
// stopped
@@ -237,7 +262,8 @@ final class BackOffTimerTask implements BackOffTimer.Task,
Runnable {
@Override
public String toString() {
return "BackOffTimerTask["
- + "status=" + status
+ + "name=" + timer.getName()
+ + ", status=" + status
+ ", currentAttempts=" + currentAttempts
+ ", currentDelay=" + currentDelay
+ ", currentElapsedTime=" + currentElapsedTime
diff --git
a/core/camel-util/src/main/java/org/apache/camel/util/backoff/SimpleBackOffTimer.java
b/core/camel-util/src/main/java/org/apache/camel/util/backoff/SimpleBackOffTimer.java
new file mode 100644
index 00000000000..b98c8e5a26f
--- /dev/null
+++
b/core/camel-util/src/main/java/org/apache/camel/util/backoff/SimpleBackOffTimer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.backoff;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.util.function.ThrowingFunction;
+
+/**
+ * A simple timer utility that use a linked {@link BackOff} to determine when
a task should be executed.
+ */
+public class SimpleBackOffTimer implements BackOffTimer, Closeable {
+ private final ScheduledExecutorService scheduler;
+ private final String name;
+ private final Set<BackOffTimerTask> tasks = new CopyOnWriteArraySet<>();
+
+ public SimpleBackOffTimer(ScheduledExecutorService scheduler) {
+ this("SimpleBackOffTimer", scheduler);
+ }
+
+ public SimpleBackOffTimer(String name, ScheduledExecutorService scheduler)
{
+ this.name = name;
+ this.scheduler = scheduler;
+ }
+
+ /**
+ * Schedule the given function/task to be executed some time in the future
according to the given backOff.
+ */
+ public Task schedule(BackOff backOff, ThrowingFunction<Task, Boolean,
Exception> function) {
+ final BackOffTimerTask task = new BackOffTimerTask(this, backOff,
scheduler, function);
+
+ long delay = task.next();
+ if (delay != BackOff.NEVER) {
+ tasks.add(task);
+ scheduler.schedule(task, delay, TimeUnit.MILLISECONDS);
+ } else {
+ task.cancel();
+ }
+
+ return task;
+ }
+
+ /**
+ * Gets the name of this timer.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Removes the task
+ */
+ public void remove(Task task) {
+ tasks.remove(task);
+ }
+
+ /**
+ * Access to unmodifiable set of all the tasks
+ */
+ public Set<Task> getTasks() {
+ return Collections.unmodifiableSet(tasks);
+ }
+
+ /**
+ * Number of tasks
+ */
+ public int size() {
+ return tasks.size();
+ }
+
+ /**
+ * Stops and closes this timer.
+ */
+ public void close() {
+ tasks.clear();
+ }
+}
diff --git
a/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTest.java
b/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTest.java
index 1f8edf635df..4e9105d681c 100644
---
a/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTest.java
+++
b/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTest.java
@@ -27,7 +27,7 @@ public class BackOffTest {
@Test
public void testSimpleBackOff() {
final BackOff backOff = BackOff.builder().build();
- final BackOffTimerTask context = new BackOffTimerTask(backOff, null, t
-> true);
+ final BackOffTimerTask context = new BackOffTimerTask(null, backOff,
null, t -> true);
long delay;
@@ -43,7 +43,7 @@ public class BackOffTest {
@Test
public void testBackOffWithMultiplier() {
final BackOff backOff = BackOff.builder().multiplier(1.5).build();
- final BackOffTimerTask context = new BackOffTimerTask(backOff, null, t
-> true);
+ final BackOffTimerTask context = new BackOffTimerTask(null, backOff,
null, t -> true);
long delay = BackOff.DEFAULT_DELAY.toMillis();
long oldDelay;
@@ -64,7 +64,7 @@ public class BackOffTest {
@Test
public void testBackOffWithMaxAttempts() {
final BackOff backOff = BackOff.builder().maxAttempts(5L).build();
- final BackOffTimerTask context = new BackOffTimerTask(backOff, null, t
-> true);
+ final BackOffTimerTask context = new BackOffTimerTask(null, backOff,
null, t -> true);
long delay;
@@ -84,7 +84,7 @@ public class BackOffTest {
@Test
public void testBackOffWithMaxTime() {
final BackOff backOff = BackOff.builder().maxElapsedTime(9,
TimeUnit.SECONDS).build();
- final BackOffTimerTask context = new BackOffTimerTask(backOff, null, t
-> true);
+ final BackOffTimerTask context = new BackOffTimerTask(null, backOff,
null, t -> true);
long delay;
diff --git
a/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java
b/core/camel-util/src/test/java/org/apache/camel/util/backoff/SimpleBackOffTimerTest.java
similarity index 66%
rename from
core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java
rename to
core/camel-util/src/test/java/org/apache/camel/util/backoff/SimpleBackOffTimerTest.java
index 810a3d6e662..634578742de 100644
---
a/core/camel-util/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java
+++
b/core/camel-util/src/test/java/org/apache/camel/util/backoff/SimpleBackOffTimerTest.java
@@ -29,15 +29,15 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class BackOffTimerTest {
+public class SimpleBackOffTimerTest {
@Test
public void testBackOffTimer() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger counter = new AtomicInteger();
final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(3);
- final BackOff backOff = BackOff.builder().delay(100).build();
- final BackOffTimer timer = new BackOffTimer(executor);
+ final BackOff backOff =
BackOff.builder().delay(100).removeOnComplete(false).build();
+ final SimpleBackOffTimer timer = new SimpleBackOffTimer(executor);
final AtomicLong first = new AtomicLong();
BackOffTimer.Task task = timer.schedule(
@@ -63,6 +63,10 @@ public class BackOffTimerTest {
latch.await(5, TimeUnit.SECONDS);
executor.shutdownNow();
+
+ assertEquals(1, timer.size());
+ assertEquals(BackOffTimer.Task.Status.Completed,
timer.getTasks().iterator().next().getStatus());
+ timer.close();
}
@Test
@@ -70,8 +74,8 @@ public class BackOffTimerTest {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger counter = new AtomicInteger();
final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(3);
- final BackOff backOff =
BackOff.builder().delay(100).maxAttempts(5L).build();
- final BackOffTimer timer = new BackOffTimer(executor);
+ final BackOff backOff =
BackOff.builder().delay(100).maxAttempts(5L).removeOnComplete(false).build();
+ final SimpleBackOffTimer timer = new SimpleBackOffTimer(executor);
BackOffTimer.Task task = timer.schedule(
backOff,
@@ -92,6 +96,10 @@ public class BackOffTimerTest {
latch.await(5, TimeUnit.SECONDS);
executor.shutdownNow();
+
+ assertEquals(1, timer.size());
+ assertEquals(BackOffTimer.Task.Status.Exhausted,
timer.getTasks().iterator().next().getStatus());
+ timer.close();
}
@Test
@@ -99,8 +107,8 @@ public class BackOffTimerTest {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger counter = new AtomicInteger();
final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(3);
- final BackOff backOff =
BackOff.builder().delay(100).maxElapsedTime(400).build();
- final BackOffTimer timer = new BackOffTimer(executor);
+ final BackOff backOff =
BackOff.builder().delay(100).maxElapsedTime(400).removeOnComplete(false).build();
+ final SimpleBackOffTimer timer = new SimpleBackOffTimer(executor);
BackOffTimer.Task task = timer.schedule(
backOff,
@@ -121,6 +129,10 @@ public class BackOffTimerTest {
latch.await(5, TimeUnit.SECONDS);
executor.shutdownNow();
+
+ assertEquals(1, timer.size());
+ assertEquals(BackOffTimer.Task.Status.Exhausted,
timer.getTasks().iterator().next().getStatus());
+ timer.close();
}
@Test
@@ -128,8 +140,8 @@ public class BackOffTimerTest {
final CountDownLatch latch = new CountDownLatch(5);
final AtomicBoolean done = new AtomicBoolean();
final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(3);
- final BackOff backOff = BackOff.builder().delay(100).build();
- final BackOffTimer timer = new BackOffTimer(executor);
+ final BackOff backOff =
BackOff.builder().delay(100).removeOnComplete(false).build();
+ final SimpleBackOffTimer timer = new SimpleBackOffTimer(executor);
BackOffTimer.Task task = timer.schedule(
backOff,
@@ -148,9 +160,53 @@ public class BackOffTimerTest {
});
latch.await(2, TimeUnit.SECONDS);
+ assertEquals(1, timer.size());
+ assertEquals(BackOffTimer.Task.Status.Completed,
timer.getTasks().iterator().next().getStatus());
task.cancel();
+ assertEquals(0, timer.size());
assertTrue(done.get());
executor.shutdownNow();
+
+ timer.close();
}
+
+ @Test
+ public void testBackOffTimerRemoveOnComplete() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger counter = new AtomicInteger();
+ final ScheduledExecutorService executor =
Executors.newScheduledThreadPool(3);
+ final BackOff backOff =
BackOff.builder().delay(100).removeOnComplete(true).build();
+ final SimpleBackOffTimer timer = new SimpleBackOffTimer(executor);
+ final AtomicLong first = new AtomicLong();
+
+ BackOffTimer.Task task = timer.schedule(
+ backOff,
+ context -> {
+ assertEquals(counter.incrementAndGet(),
context.getCurrentAttempts());
+ assertEquals(100, context.getCurrentDelay());
+ assertEquals(100L * counter.get(),
context.getCurrentElapsedTime());
+ if (first.get() == 0) {
+ first.set(context.getFirstAttemptTime());
+ } else {
+ assertEquals(first.get(),
context.getFirstAttemptTime());
+ }
+
+ return counter.get() < 5;
+ });
+
+ task.whenComplete(
+ (context, throwable) -> {
+ assertEquals(5, counter.get());
+ latch.countDown();
+ });
+
+ latch.await(5, TimeUnit.SECONDS);
+ executor.shutdownNow();
+
+ // task is removed
+ assertEquals(0, timer.size());
+ timer.close();
+ }
+
}
diff --git
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_13.adoc
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_13.adoc
index a78051592d6..bdf9eb9ad88 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_13.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_13.adoc
@@ -11,6 +11,21 @@ from both 4.0 to 4.1 and 4.1 to 4.2.
Added a 2nd `lookup` method to `org.apache.camel.spi.TypeConverterRegistry`
and changed the `addConverter` to no longer have
an empty default noop implementation in the interface.
+The class `org.apache.camel.util.backoff.BackOffTimer` has been refactored as
an interface,
+and the basic implementation is
`org.apache.camel.util.backoff.SimpleBackOffTimer` in `camel-util` JAR.
+
+To get hold of a `BackOffTimer` then use the new factory as shown below:
+
+[source,java]
+----
+BackOffTimer timer =
PluginHelper.getBackOffTimerFactory(camelContext.getCamelContextExtension())
+ .newBackOffTimer("NameOfTimer", executorService);
+----
+
+The `BackOffTimer` is mostly used internally in Camel components to conduct
tasks that should
+be repeated until completed, such as recovery tasks. And as such this refactor
is not
+expected to impact Camel end users.
+
=== camel-file / camel-ftp / camel-smb / camel-azure-files
When using `poll` or `pollEnrich` with the file based components, then the
`eagerLimitMaxMessagesPerPoll` option
diff --git
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
index eb2434e0c1a..7ee908eabb6 100644
---
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
+++
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
@@ -1058,6 +1058,13 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
root.put("receive", json);
}
}
+ DevConsole dc24 = dcr.resolveById("backoff");
+ if (dc24 != null) {
+ JsonObject json = (JsonObject)
dc24.call(DevConsole.MediaType.JSON);
+ if (json != null && !json.isEmpty()) {
+ root.put("backoff", json);
+ }
+ }
}
// various details
JsonObject mem = collectMemory();
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
index b18d6cd299d..3d628dfa5f4 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
@@ -108,6 +108,7 @@ public class CamelJBangMain implements Callable<Integer> {
.addSubcommand("event", new CommandLine(new
ListEvent(main)))
.addSubcommand("inflight", new CommandLine(new
ListInflight(main)))
.addSubcommand("blocked", new CommandLine(new
ListBlocked(main)))
+ .addSubcommand("backoff", new CommandLine(new
ListBackOff(main)))
.addSubcommand("bean", new CommandLine(new
CamelBeanDump(main)))
.addSubcommand("route-controller", new CommandLine(new
RouteControllerAction(main)))
.addSubcommand("transformer", new CommandLine(new
ListTransformer(main)))
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListBackOff.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListBackOff.java
new file mode 100644
index 00000000000..3e17c8a378e
--- /dev/null
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListBackOff.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.jbang.core.commands.process;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import com.github.freva.asciitable.AsciiTable;
+import com.github.freva.asciitable.Column;
+import com.github.freva.asciitable.HorizontalAlign;
+import com.github.freva.asciitable.OverflowBehaviour;
+import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
+import org.apache.camel.dsl.jbang.core.common.PidNameAgeCompletionCandidates;
+import org.apache.camel.dsl.jbang.core.common.ProcessHelper;
+import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+
+@Command(name = "backoff",
+ description = "Get back-off tasks of Camel integrations", sortOptions
= false, showDefaultValues = true)
+public class ListBackOff extends ProcessWatchCommand {
+
+ @CommandLine.Parameters(description = "Name or pid of running Camel
integration", arity = "0..1")
+ String name = "*";
+
+ @CommandLine.Option(names = { "--sort" }, completionCandidates =
PidNameAgeCompletionCandidates.class,
+ description = "Sort by pid, name or age", defaultValue
= "pid")
+ String sort;
+
+ public ListBackOff(CamelJBangMain main) {
+ super(main);
+ }
+
+ @Override
+ public Integer doProcessWatchCall() throws Exception {
+ List<Row> rows = new ArrayList<>();
+
+ List<Long> pids = findPids(name);
+ ProcessHandle.allProcesses()
+ .filter(ph -> pids.contains(ph.pid()))
+ .forEach(ph -> {
+ JsonObject root = loadStatus(ph.pid());
+ // there must be a status file for the running Camel
integration
+ if (root != null) {
+ Row row = new Row();
+ JsonObject context = (JsonObject) root.get("context");
+ if (context == null) {
+ return;
+ }
+ row.name = context.getString("name");
+ if ("CamelJBang".equals(row.name)) {
+ row.name = ProcessHelper.extractName(root, ph);
+ }
+ row.pid = Long.toString(ph.pid());
+ row.uptime = extractSince(ph);
+ row.age = TimeUtils.printSince(row.uptime);
+
+ JsonObject jo = (JsonObject) root.get("backoff");
+ if (jo != null) {
+ JsonArray arr = (JsonArray) jo.get("timers");
+ if (arr != null) {
+ for (int i = 0; i < arr.size(); i++) {
+ jo = (JsonObject) arr.get(i);
+ JsonArray arr2 = (JsonArray)
jo.get("tasks");
+ for (int j = 0; j < arr2.size(); j++) {
+ jo = (JsonObject) arr2.get(j);
+ row = row.copy();
+ row.task = jo.getString("name");
+ row.status = jo.getString("status");
+ row.attempts = jo.getLong("attempts");
+ row.delay = jo.getLong("delay");
+ row.elapsed = jo.getLong("elapsed");
+ row.firstTime =
jo.getLong("firstTime");
+ row.lastTime = jo.getLong("lastTime");
+ row.nextTime = jo.getLong("nextTime");
+ row.error = jo.getString("error");
+ rows.add(row);
+ }
+ }
+ }
+ }
+ }
+ });
+
+ // sort rows
+ rows.sort(this::sortRow);
+
+ if (!rows.isEmpty()) {
+ printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows,
Arrays.asList(
+ new
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
+ new
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30,
OverflowBehaviour.ELLIPSIS_RIGHT)
+ .with(r -> r.name),
+ new
Column().header("TASK").dataAlign(HorizontalAlign.LEFT).with(r -> r.task),
+ new
Column().header("STATUS").dataAlign(HorizontalAlign.LEFT).with(r -> r.status),
+ new
Column().header("ATTEMPT").dataAlign(HorizontalAlign.LEFT).with(r -> "" +
r.attempts),
+ new
Column().header("DELAY").dataAlign(HorizontalAlign.LEFT).with(r -> "" +
r.delay),
+ new
Column().header("ELAPSED").dataAlign(HorizontalAlign.LEFT).with(this::getElapsed),
+ new
Column().header("FIRST").dataAlign(HorizontalAlign.LEFT).with(this::getFirst),
+ new
Column().header("LAST").dataAlign(HorizontalAlign.LEFT).with(this::getLast),
+ new
Column().header("NEXT").dataAlign(HorizontalAlign.LEFT).with(this::getNext),
+ new
Column().header("FAILURE").dataAlign(HorizontalAlign.LEFT)
+ .maxWidth(140, OverflowBehaviour.NEWLINE)
+ .with(r -> r.error))));
+ }
+
+ return 0;
+ }
+
+ private String getElapsed(Row r) {
+ return TimeUtils.printAge(r.elapsed);
+ }
+
+ private String getFirst(Row r) {
+ if (r.firstTime > 0) {
+ return TimeUtils.printSince(r.firstTime);
+ }
+ return "";
+ }
+
+ private String getLast(Row r) {
+ if (r.lastTime > 0) {
+ return TimeUtils.printSince(r.lastTime, true);
+ }
+ return "";
+ }
+
+ private String getNext(Row r) {
+ if (r.nextTime > 0) {
+ long age = r.nextTime - System.currentTimeMillis();
+ return TimeUtils.printDuration(age, true);
+ }
+ return "";
+ }
+
+ protected int sortRow(Row o1, Row o2) {
+ String s = sort;
+ int negate = 1;
+ if (s.startsWith("-")) {
+ s = s.substring(1);
+ negate = -1;
+ }
+ switch (s) {
+ case "pid":
+ return Long.compare(Long.parseLong(o1.pid),
Long.parseLong(o2.pid)) * negate;
+ case "name":
+ return o1.name.compareToIgnoreCase(o2.name) * negate;
+ case "age":
+ return Long.compare(o1.uptime, o2.uptime) * negate;
+ default:
+ return 0;
+ }
+ }
+
+ private static class Row implements Cloneable {
+ String pid;
+ String name;
+ long uptime;
+ String age;
+ String task;
+ String status;
+ long attempts;
+ long delay;
+ long elapsed;
+ long firstTime;
+ long lastTime;
+ long nextTime;
+ String error;
+
+ Row copy() {
+ try {
+ return (Row) clone();
+ } catch (CloneNotSupportedException e) {
+ return null;
+ }
+ }
+ }
+
+}
diff --git
a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderPropertiesComponent.java
b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderPropertiesComponent.java
index b348bb97013..56dec5ef545 100644
---
a/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderPropertiesComponent.java
+++
b/dsl/camel-kamelet-main/src/main/java/org/apache/camel/main/download/DependencyDownloaderPropertiesComponent.java
@@ -22,6 +22,7 @@ import java.util.Properties;
import org.apache.camel.CamelContext;
import org.apache.camel.Expression;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.StaticService;
import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.Language;
import org.apache.camel.support.DefaultExchange;
@@ -33,7 +34,7 @@ import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DependencyDownloaderPropertiesComponent extends ServiceSupport {
+public class DependencyDownloaderPropertiesComponent extends ServiceSupport
implements StaticService {
private static final Logger LOG =
LoggerFactory.getLogger(DependencyDownloaderPropertiesComponent.class);