This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 61b3f8e3024 CAMEL-21322: camel-core: Add
RouteRestartingEvent/RouteRestartingFailureEvent to keep track of route during
supervising startup of routes. (#15843)
61b3f8e3024 is described below
commit 61b3f8e3024ec338d7a4e35e36614743900df4aa
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Oct 6 18:36:57 2024 +0200
CAMEL-21322: camel-core: Add
RouteRestartingEvent/RouteRestartingFailureEvent to keep track of route during
supervising startup of routes. (#15843)
---
...mppTRXProducerSupervisingRouteControllerIT.java | 20 ++++++
.../main/java/org/apache/camel/spi/CamelEvent.java | 34 ++++++++++
.../java/org/apache/camel/spi/EventFactory.java | 21 ++++++
.../engine/DefaultSupervisingRouteController.java | 13 +++-
.../camel/impl/event/DefaultEventFactory.java | 18 +++++
.../camel/impl/event/RouteRestartingEvent.java | 42 ++++++++++++
.../impl/event/RouteRestartingFailureEvent.java | 57 ++++++++++++++++
.../DefaultSupervisingRouteControllerTest.java | 47 +++++++++++++
.../java/org/apache/camel/support/EventHelper.java | 79 ++++++++++++++++++++++
9 files changed, 329 insertions(+), 2 deletions(-)
diff --git
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppTRXProducerSupervisingRouteControllerIT.java
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppTRXProducerSupervisingRouteControllerIT.java
index 08f78639b7b..74092f2f11a 100644
---
a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppTRXProducerSupervisingRouteControllerIT.java
+++
b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppTRXProducerSupervisingRouteControllerIT.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.smpp.integration;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -29,7 +31,9 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.smpp.SmppConstants;
import org.apache.camel.component.smpp.SmppMessageType;
+import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.SupervisingRouteController;
+import org.apache.camel.support.SimpleEventNotifierSupport;
import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.jsmpp.examples.SMPPServerSimulator;
@@ -41,6 +45,7 @@ import org.junit.jupiter.api.parallel.Isolated;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Isolated
class SmppTRXProducerSupervisingRouteControllerIT extends CamelTestSupport {
@@ -63,6 +68,8 @@ class SmppTRXProducerSupervisingRouteControllerIT extends
CamelTestSupport {
@EndpointInject("direct:start")
private Endpoint start;
+ private List<CamelEvent.RouteRestartingEvent> events;
+
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();
@@ -73,6 +80,16 @@ class SmppTRXProducerSupervisingRouteControllerIT extends
CamelTestSupport {
src.setInitialDelay(100);
src.setThreadPoolSize(2);
+ events = new ArrayList<>();
+ context.getManagementStrategy().addEventNotifier(new
SimpleEventNotifierSupport() {
+ @Override
+ public void notify(CamelEvent event) throws Exception {
+ if (event instanceof CamelEvent.RouteRestartingEvent rre) {
+ events.add(rre);
+ }
+ }
+ });
+
return context;
}
@@ -109,6 +126,9 @@ class SmppTRXProducerSupervisingRouteControllerIT extends
CamelTestSupport {
assertNotNull(exchange.getIn().getHeader(SmppConstants.ID));
assertEquals(1,
exchange.getIn().getHeader(SmppConstants.SENT_MESSAGE_COUNT));
+
+ // there should be some restart events
+ assertTrue(events.size() >= 3, "There should be restarting events,
size: " + events.size());
}
@Override
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
b/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
index d890a345f5c..8bf7aa52c0b 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
@@ -64,6 +64,8 @@ public interface CamelEvent {
RouteStarted,
RouteStopping,
RouteStopped,
+ RouteRestarting,
+ RouteRestartingFailure,
ServiceStartupFailure,
ServiceStopFailure,
StepStarted,
@@ -426,6 +428,38 @@ public interface CamelEvent {
}
}
+ interface RouteRestartingEvent extends RouteEvent {
+
+ /**
+ * Restart attempt (0 = initial start, 1 = first restart attempt)
+ */
+ long getAttempt();
+
+ @Override
+ default Type getType() {
+ return Type.RouteRestarting;
+ }
+ }
+
+ interface RouteRestartingFailureEvent extends RouteEvent, FailureEvent {
+
+ /**
+ * Failure attempt (0 = initial start, 1 = first restart attempt)
+ */
+ long getAttempt();
+
+ /**
+ * Whether all restarts have failed and the route controller will not
attempt to restart the route anymore due
+ * to maximum attempts reached and being exhausted.
+ */
+ boolean isExhausted();
+
+ @Override
+ default Type getType() {
+ return Type.RouteRestartingFailure;
+ }
+ }
+
interface ServiceEvent extends CamelEvent {
Object getService();
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
b/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
index d390f2db564..b7412a218b6 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
@@ -243,6 +243,27 @@ public interface EventFactory {
*/
CamelEvent createRouteReloaded(Route route, int index, int total);
+ /**
+ * Creates an {@link CamelEvent} for {@link Route} being restarted by
{@link SupervisingRouteController}.
+ *
+ * @param route the route
+ * @param attempt the attempt number for restarting the route
+ * @return the restarting event
+ */
+ CamelEvent createRouteRestarting(Route route, long attempt);
+
+ /**
+ * Creates an {@link CamelEvent} for {@link Route} being restarted and
failed by {@link SupervisingRouteController}.
+ *
+ * @param route the route
+ * @param attempt the attempt number for restarting the route
+ * @param cause the exception causing the failure
+ * @param exhausted whether the supervising controller is exhausted and
will not attempt to restart this route
+ * anymore
+ * @return the restarting failure event
+ */
+ CamelEvent createRouteRestartingFailure(Route route, long attempt,
Throwable cause, boolean exhausted);
+
/**
* Creates an {@link CamelEvent} when an {@link org.apache.camel.Exchange}
has been created
*
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
index 6d8e06976af..bcaaa898d1b 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultSupervisingRouteController.java
@@ -50,6 +50,7 @@ import org.apache.camel.spi.RouteError;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.spi.RoutePolicyFactory;
import org.apache.camel.spi.SupervisingRouteController;
+import org.apache.camel.support.EventHelper;
import org.apache.camel.support.PatternHelper;
import org.apache.camel.support.RoutePolicySupport;
import org.apache.camel.util.ObjectHelper;
@@ -470,6 +471,8 @@ public class DefaultSupervisingRouteController extends
DefaultRouteController im
consumer.accept(route);
} catch (Exception e) {
if (checker) {
+ // first attempt is (starting and not restarting)
+
EventHelper.notifyRouteRestartingFailure(getCamelContext(), route.get(), 0, e,
false);
// if start fails the route is moved to controller
supervision
// so its get (eventually) restarted
routeManager.start(route);
@@ -672,6 +675,7 @@ public class DefaultSupervisingRouteController extends
DefaultRouteController im
try {
logger.info("Restarting route: {} attempt:
{}", r.getId(), attempt);
+
EventHelper.notifyRouteRestarting(getCamelContext(), r.get(), attempt);
doStartRoute(r, false, rx ->
DefaultSupervisingRouteController.super.startRoute(rx.getId()));
logger.info("Route: {} started after {}
attempts", r.getId(), attempt);
return false;
@@ -681,6 +685,7 @@ public class DefaultSupervisingRouteController extends
DefaultRouteController im
logger.info("Failed restarting route: {}
attempt: {} due: {} (stacktrace in debug log level)",
r.getId(), attempt, cause);
logger.debug(" Error restarting route
caused by: {}", e.getMessage(), e);
+
EventHelper.notifyRouteRestartingFailure(getCamelContext(), r.get(), attempt,
e, false);
return true;
}
});
@@ -698,17 +703,21 @@ public class DefaultSupervisingRouteController extends
DefaultRouteController im
if (backOffTask != null &&
backOffTask.getStatus() == BackOffTimer.Task.Status.Exhausted
&& stopped) {
+ long attempts =
backOffTask.getCurrentAttempts() - 1;
LOG.warn(
"Restarting route: {} is
exhausted after {} attempts. No more attempts will be made"
+ " and the route is no
longer supervised by this route controller and remains as stopped.",
- route.getId(),
backOffTask.getCurrentAttempts() - 1);
+ route.getId(), attempts);
r.get().setRouteController(null);
// remember exhausted routes
routeManager.exhausted.put(r, task);
+ // store as last error on route as it
was exhausted
+ Throwable t =
getRestartException(route.getId());
+
EventHelper.notifyRouteRestartingFailure(getCamelContext(), r.get(), attempts,
t, true);
+
if (unhealthyOnExhausted) {
// store as last error on route as
it was exhausted
- Throwable t =
getRestartException(route.getId());
if (t != null) {
DefaultRouteError.set(getCamelContext(), r.getId(), RouteError.Phase.START, t,
true);
diff --git
a/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
b/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
index 2b53623566e..4c1695d8421 100644
---
a/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
+++
b/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
@@ -213,6 +213,24 @@ public class DefaultEventFactory implements EventFactory {
return answer;
}
+ @Override
+ public CamelEvent createRouteRestarting(Route route, long attempt) {
+ CamelEvent answer = new RouteRestartingEvent(route, attempt);
+ if (timestampEnabled) {
+ answer.setTimestamp(System.currentTimeMillis());
+ }
+ return answer;
+ }
+
+ @Override
+ public CamelEvent createRouteRestartingFailure(Route route, long attempt,
Throwable cause, boolean exhausted) {
+ CamelEvent answer = new RouteRestartingFailureEvent(route, attempt,
cause, exhausted);
+ if (timestampEnabled) {
+ answer.setTimestamp(System.currentTimeMillis());
+ }
+ return answer;
+ }
+
@Override
public CamelEvent createRouteStoppingEvent(Route route) {
CamelEvent answer = new RouteStoppingEvent(route);
diff --git
a/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingEvent.java
b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingEvent.java
new file mode 100644
index 00000000000..8f9f378b94f
--- /dev/null
+++
b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.event;
+
+import java.io.Serial;
+
+import org.apache.camel.Route;
+import org.apache.camel.spi.CamelEvent;
+
+public class RouteRestartingEvent extends AbstractRouteEvent implements
CamelEvent.RouteRestartingEvent {
+ private static final @Serial long serialVersionUID = 1330257282431407330L;
+ private final long attempt;
+
+ public RouteRestartingEvent(Route source, long attempt) {
+ super(source);
+ this.attempt = attempt;
+ }
+
+ @Override
+ public long getAttempt() {
+ return attempt;
+ }
+
+ @Override
+ public String toString() {
+ return "Route restarting: " + getRoute().getId() + " (attempt: " +
attempt + ")";
+ }
+}
diff --git
a/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingFailureEvent.java
b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingFailureEvent.java
new file mode 100644
index 00000000000..38be171ec64
--- /dev/null
+++
b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteRestartingFailureEvent.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.event;
+
+import java.io.Serial;
+
+import org.apache.camel.Route;
+import org.apache.camel.spi.CamelEvent;
+
+public class RouteRestartingFailureEvent extends AbstractRouteEvent implements
CamelEvent.RouteRestartingFailureEvent {
+ private static final @Serial long serialVersionUID = 1330257282431407331L;
+ private final long attempt;
+ private final Throwable cause;
+ private final boolean exhausted;
+
+ public RouteRestartingFailureEvent(Route source, long attempt, Throwable
cause, boolean exhausted) {
+ super(source);
+ this.attempt = attempt;
+ this.cause = cause;
+ this.exhausted = exhausted;
+ }
+
+ public long getAttempt() {
+ return attempt;
+ }
+
+ @Override
+ public Throwable getCause() {
+ return cause;
+ }
+
+ @Override
+ public boolean isExhausted() {
+ return exhausted;
+ }
+
+ @Override
+ public String toString() {
+ return "Route " + (attempt == 0 ? "starting " : "restarting ") +
(exhausted ? "exhausted: " : "failed: ")
+ + getRoute().getId() + " (attempt: " + attempt
+ + ") due to " + cause.getMessage();
+ }
+}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java
b/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java
index b2fe6490b8e..d0e6ee37c58 100644
---
a/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/impl/engine/DefaultSupervisingRouteControllerTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.impl.engine;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -28,7 +30,10 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.seda.SedaComponent;
import org.apache.camel.component.seda.SedaConsumer;
import org.apache.camel.component.seda.SedaEndpoint;
+import org.apache.camel.impl.event.RouteRestartingEvent;
+import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.SupervisingRouteController;
+import org.apache.camel.support.SimpleEventNotifierSupport;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
@@ -52,6 +57,20 @@ public class DefaultSupervisingRouteControllerTest extends
ContextTestSupport {
src.setInitialDelay(100);
src.setThreadPoolSize(2);
+ List<CamelEvent.RouteRestartingFailureEvent> failure = new
ArrayList<>();
+ List<CamelEvent.RouteRestartingEvent> events = new ArrayList<>();
+
+ context.getManagementStrategy().addEventNotifier(new
SimpleEventNotifierSupport() {
+ @Override
+ public void notify(CamelEvent event) throws Exception {
+ if (event instanceof CamelEvent.RouteRestartingFailureEvent
rfe) {
+ failure.add(rfe);
+ } else if (event instanceof RouteRestartingEvent rre) {
+ events.add(rre);
+ }
+ }
+ });
+
context.start();
MockEndpoint mock = context.getEndpoint("mock:foo",
MockEndpoint.class);
@@ -82,6 +101,15 @@ public class DefaultSupervisingRouteControllerTest extends
ContextTestSupport {
// bar is no auto startup
assertEquals("Stopped",
context.getRouteController().getRouteStatus("bar").toString());
+
+ // 2 x 1 initial + 2 x 3 restart failure + 2 x 1 exhausted
+ assertEquals(10, failure.size());
+ // 2 x 3 restart attempts
+ assertEquals(6, events.size());
+
+ // last should be exhausted
+ assertTrue(failure.get(8).isExhausted());
+ assertTrue(failure.get(9).isExhausted());
}
@Test
@@ -96,6 +124,20 @@ public class DefaultSupervisingRouteControllerTest extends
ContextTestSupport {
src.setInitialDelay(100);
src.setThreadPoolSize(2);
+ List<CamelEvent.RouteRestartingFailureEvent> failure = new
ArrayList<>();
+ List<CamelEvent.RouteRestartingEvent> events = new ArrayList<>();
+
+ context.getManagementStrategy().addEventNotifier(new
SimpleEventNotifierSupport() {
+ @Override
+ public void notify(CamelEvent event) throws Exception {
+ if (event instanceof CamelEvent.RouteRestartingFailureEvent
rfe) {
+ failure.add(rfe);
+ } else if (event instanceof RouteRestartingEvent rre) {
+ events.add(rre);
+ }
+ }
+ });
+
context.start();
MockEndpoint mock = context.getEndpoint("mock:foo",
MockEndpoint.class);
@@ -118,6 +160,11 @@ public class DefaultSupervisingRouteControllerTest extends
ContextTestSupport {
assertEquals("Started",
context.getRouteController().getRouteStatus("cake").toString());
// bar is no auto startup
assertEquals("Stopped",
context.getRouteController().getRouteStatus("bar").toString());
+
+ // 2 x 1 initial + 2 x 4 restart failure attempts
+ assertEquals(10, failure.size());
+ // 2 x 5 restart attempts
+ assertEquals(10, events.size());
}
private static class MyRoute extends RouteBuilder {
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
b/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
index f694ef00563..6bc547ae6a4 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
@@ -266,6 +266,85 @@ public final class EventHelper {
return answer;
}
+ public static boolean notifyRouteRestarting(CamelContext context, Route
route, long attempt) {
+ ManagementStrategy management = context.getManagementStrategy();
+ if (management == null) {
+ return false;
+ }
+
+ EventFactory factory = management.getEventFactory();
+ if (factory == null) {
+ return false;
+ }
+
+ List<EventNotifier> notifiers = management.getStartedEventNotifiers();
+ if (notifiers == null || notifiers.isEmpty()) {
+ return false;
+ }
+
+ boolean answer = false;
+ CamelEvent event = null;
+ for (EventNotifier notifier : notifiers) {
+ if (notifier.isDisabled()) {
+ continue;
+ }
+ if (notifier.isIgnoreRouteEvents()) {
+ continue;
+ }
+
+ if (event == null) {
+ // only create event once
+ event = factory.createRouteRestarting(route, attempt);
+ if (event == null) {
+ // factory could not create event so exit
+ return false;
+ }
+ }
+ answer |= doNotifyEvent(notifier, event);
+ }
+ return answer;
+ }
+
+ public static boolean notifyRouteRestartingFailure(
+ CamelContext context, Route route, long attempt, Throwable cause,
boolean exhausted) {
+ ManagementStrategy management = context.getManagementStrategy();
+ if (management == null) {
+ return false;
+ }
+
+ EventFactory factory = management.getEventFactory();
+ if (factory == null) {
+ return false;
+ }
+
+ List<EventNotifier> notifiers = management.getStartedEventNotifiers();
+ if (notifiers == null || notifiers.isEmpty()) {
+ return false;
+ }
+
+ boolean answer = false;
+ CamelEvent event = null;
+ for (EventNotifier notifier : notifiers) {
+ if (notifier.isDisabled()) {
+ continue;
+ }
+ if (notifier.isIgnoreRouteEvents()) {
+ continue;
+ }
+
+ if (event == null) {
+ // only create event once
+ event = factory.createRouteRestartingFailure(route, attempt,
cause, exhausted);
+ if (event == null) {
+ // factory could not create event so exit
+ return false;
+ }
+ }
+ answer |= doNotifyEvent(notifier, event);
+ }
+ return answer;
+ }
+
public static boolean notifyRouteStarted(CamelContext context, Route
route) {
ManagementStrategy management = context.getManagementStrategy();
if (management == null) {