This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 350c5fb0f60 CAMEL-22146: camel-pgevent - Re-connect in case of lost
connection to… (#18305)
350c5fb0f60 is described below
commit 350c5fb0f60c888e4f0a7709bccf2b7fd83e2ec5
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Jun 6 10:52:52 2025 +0200
CAMEL-22146: camel-pgevent - Re-connect in case of lost connection to…
(#18305)
* CAMEL-22146: camel-pgevent - Re-connect in case of lost connection to
database.
---
.../apache/camel/catalog/components/pgevent.json | 12 +-
components/camel-pgevent/pom.xml | 6 -
.../pgevent/PgEventEndpointConfigurer.java | 24 ++++
.../pgevent/PgEventEndpointUriFactory.java | 6 +-
.../apache/camel/component/pgevent/pgevent.json | 12 +-
.../camel/component/pgevent/PgEventConsumer.java | 136 ++++++++++++++-----
.../camel/component/pgevent/PgEventEndpoint.java | 59 +++++++++
.../apache/camel/pgevent/PgEventConsumerTest.java | 127 ------------------
.../apache/camel/pgevent/PgEventHelperTest.java | 65 ----------
.../apache/camel/pgevent/PgEventProducerTest.java | 144 ---------------------
.../dsl/PgEventEndpointBuilderFactory.java | 130 +++++++++++++++++++
11 files changed, 340 insertions(+), 381 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pgevent.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pgevent.json
index 9abe4d1d5b2..1f11c8b900e 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pgevent.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pgevent.json
@@ -39,9 +39,13 @@
"bridgeErrorHandler": { "index": 4, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
"exceptionHandler": { "index": 5, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By def [...]
"exchangePattern": { "index": 6, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
- "lazyStartProducer": { "index": 7, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produc [...]
- "datasource": { "index": 8, "kind": "parameter", "displayName":
"Datasource", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "javax.sql.DataSource", "deprecated": false,
"autowired": false, "secret": false, "description": "To connect using the given
javax.sql.DataSource instead of using hostname and port." },
- "pass": { "index": 9, "kind": "parameter", "displayName": "Pass", "group":
"security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Password for login" },
- "user": { "index": 10, "kind": "parameter", "displayName": "User",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "defaultValue": "postgres", "description": "Username for login"
}
+ "reconnectDelay": { "index": 7, "kind": "parameter", "displayName":
"Reconnect Delay", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000,
"description": "When the consumer unexpected lose connection to the database,
then this specifies the interval (millis) between re-connection attempts to
establish a new connection." },
+ "workerPool": { "index": 8, "kind": "parameter", "displayName": "Worker
Pool", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"java.util.concurrent.ExecutorService", "deprecated": false, "autowired":
false, "secret": false, "description": "To use a custom worker pool for
processing the events from the database." },
+ "workerPoolCoreSize": { "index": 9, "kind": "parameter", "displayName":
"Worker Pool Core Size", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1,
"description": "Number of core threads in the worker pool for processing the
events from the database." },
+ "workerPoolMaxSize": { "index": 10, "kind": "parameter", "displayName":
"Worker Pool Max Size", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 10,
"description": "Maximum number of threads in the worker pool for processing the
events from the database." },
+ "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "datasource": { "index": 12, "kind": "parameter", "displayName":
"Datasource", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "javax.sql.DataSource", "deprecated": false,
"autowired": false, "secret": false, "description": "To connect using the given
javax.sql.DataSource instead of using hostname and port." },
+ "pass": { "index": 13, "kind": "parameter", "displayName": "Pass",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Password for login" },
+ "user": { "index": 14, "kind": "parameter", "displayName": "User",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "defaultValue": "postgres", "description": "Username for login"
}
}
}
diff --git a/components/camel-pgevent/pom.xml b/components/camel-pgevent/pom.xml
index f037ced041b..73e36df970d 100644
--- a/components/camel-pgevent/pom.xml
+++ b/components/camel-pgevent/pom.xml
@@ -52,12 +52,6 @@
<artifactId>camel-test-junit5</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-junit-jupiter</artifactId>
- <version>${mockito-version}</version>
- <scope>test</scope>
- </dependency>
<!-- test infra -->
<dependency>
diff --git
a/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointConfigurer.java
b/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointConfigurer.java
index 756e75bb17e..e453dfaf8d9 100644
---
a/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointConfigurer.java
+++
b/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointConfigurer.java
@@ -33,7 +33,15 @@ public class PgEventEndpointConfigurer extends
PropertyConfigurerSupport impleme
case "lazystartproducer":
case "lazyStartProducer":
target.setLazyStartProducer(property(camelContext, boolean.class, value));
return true;
case "pass": target.setPass(property(camelContext,
java.lang.String.class, value)); return true;
+ case "reconnectdelay":
+ case "reconnectDelay": target.setReconnectDelay(property(camelContext,
int.class, value)); return true;
case "user": target.setUser(property(camelContext,
java.lang.String.class, value)); return true;
+ case "workerpool":
+ case "workerPool": target.setWorkerPool(property(camelContext,
java.util.concurrent.ExecutorService.class, value)); return true;
+ case "workerpoolcoresize":
+ case "workerPoolCoreSize":
target.setWorkerPoolCoreSize(property(camelContext, int.class, value)); return
true;
+ case "workerpoolmaxsize":
+ case "workerPoolMaxSize":
target.setWorkerPoolMaxSize(property(camelContext, int.class, value)); return
true;
default: return false;
}
}
@@ -51,7 +59,15 @@ public class PgEventEndpointConfigurer extends
PropertyConfigurerSupport impleme
case "lazystartproducer":
case "lazyStartProducer": return boolean.class;
case "pass": return java.lang.String.class;
+ case "reconnectdelay":
+ case "reconnectDelay": return int.class;
case "user": return java.lang.String.class;
+ case "workerpool":
+ case "workerPool": return java.util.concurrent.ExecutorService.class;
+ case "workerpoolcoresize":
+ case "workerPoolCoreSize": return int.class;
+ case "workerpoolmaxsize":
+ case "workerPoolMaxSize": return int.class;
default: return null;
}
}
@@ -70,7 +86,15 @@ public class PgEventEndpointConfigurer extends
PropertyConfigurerSupport impleme
case "lazystartproducer":
case "lazyStartProducer": return target.isLazyStartProducer();
case "pass": return target.getPass();
+ case "reconnectdelay":
+ case "reconnectDelay": return target.getReconnectDelay();
case "user": return target.getUser();
+ case "workerpool":
+ case "workerPool": return target.getWorkerPool();
+ case "workerpoolcoresize":
+ case "workerPoolCoreSize": return target.getWorkerPoolCoreSize();
+ case "workerpoolmaxsize":
+ case "workerPoolMaxSize": return target.getWorkerPoolMaxSize();
default: return null;
}
}
diff --git
a/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointUriFactory.java
b/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointUriFactory.java
index dde682c9feb..b0d1353eef6 100644
---
a/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointUriFactory.java
+++
b/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointUriFactory.java
@@ -23,7 +23,7 @@ public class PgEventEndpointUriFactory extends
org.apache.camel.support.componen
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(11);
+ Set<String> props = new HashSet<>(15);
props.add("bridgeErrorHandler");
props.add("channel");
props.add("database");
@@ -34,7 +34,11 @@ public class PgEventEndpointUriFactory extends
org.apache.camel.support.componen
props.add("lazyStartProducer");
props.add("pass");
props.add("port");
+ props.add("reconnectDelay");
props.add("user");
+ props.add("workerPool");
+ props.add("workerPoolCoreSize");
+ props.add("workerPoolMaxSize");
PROPERTY_NAMES = Collections.unmodifiableSet(props);
Set<String> secretProps = new HashSet<>(2);
secretProps.add("pass");
diff --git
a/components/camel-pgevent/src/generated/resources/META-INF/org/apache/camel/component/pgevent/pgevent.json
b/components/camel-pgevent/src/generated/resources/META-INF/org/apache/camel/component/pgevent/pgevent.json
index 9abe4d1d5b2..1f11c8b900e 100644
---
a/components/camel-pgevent/src/generated/resources/META-INF/org/apache/camel/component/pgevent/pgevent.json
+++
b/components/camel-pgevent/src/generated/resources/META-INF/org/apache/camel/component/pgevent/pgevent.json
@@ -39,9 +39,13 @@
"bridgeErrorHandler": { "index": 4, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
"exceptionHandler": { "index": 5, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By def [...]
"exchangePattern": { "index": 6, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
- "lazyStartProducer": { "index": 7, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produc [...]
- "datasource": { "index": 8, "kind": "parameter", "displayName":
"Datasource", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "javax.sql.DataSource", "deprecated": false,
"autowired": false, "secret": false, "description": "To connect using the given
javax.sql.DataSource instead of using hostname and port." },
- "pass": { "index": 9, "kind": "parameter", "displayName": "Pass", "group":
"security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Password for login" },
- "user": { "index": 10, "kind": "parameter", "displayName": "User",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "defaultValue": "postgres", "description": "Username for login"
}
+ "reconnectDelay": { "index": 7, "kind": "parameter", "displayName":
"Reconnect Delay", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000,
"description": "When the consumer unexpected lose connection to the database,
then this specifies the interval (millis) between re-connection attempts to
establish a new connection." },
+ "workerPool": { "index": 8, "kind": "parameter", "displayName": "Worker
Pool", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"java.util.concurrent.ExecutorService", "deprecated": false, "autowired":
false, "secret": false, "description": "To use a custom worker pool for
processing the events from the database." },
+ "workerPoolCoreSize": { "index": 9, "kind": "parameter", "displayName":
"Worker Pool Core Size", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1,
"description": "Number of core threads in the worker pool for processing the
events from the database." },
+ "workerPoolMaxSize": { "index": 10, "kind": "parameter", "displayName":
"Worker Pool Max Size", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "integer", "javaType": "int",
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 10,
"description": "Maximum number of threads in the worker pool for processing the
events from the database." },
+ "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "datasource": { "index": 12, "kind": "parameter", "displayName":
"Datasource", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "javax.sql.DataSource", "deprecated": false,
"autowired": false, "secret": false, "description": "To connect using the given
javax.sql.DataSource instead of using hostname and port." },
+ "pass": { "index": 13, "kind": "parameter", "displayName": "Pass",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Password for login" },
+ "user": { "index": 14, "kind": "parameter", "displayName": "User",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "defaultValue": "postgres", "description": "Username for login"
}
}
}
diff --git
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
index e0ce6dfe92e..2b0509fb0b4 100644
---
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
+++
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.pgevent;
import java.sql.PreparedStatement;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import com.impossibl.postgres.api.jdbc.PGConnection;
import com.impossibl.postgres.api.jdbc.PGNotificationListener;
@@ -24,68 +26,142 @@ import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.util.backoff.BackOff;
+import org.apache.camel.util.backoff.BackOffTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The PgEvent consumer.
*/
-public class PgEventConsumer extends DefaultConsumer implements
PGNotificationListener {
+public class PgEventConsumer extends DefaultConsumer {
private static final Logger LOG =
LoggerFactory.getLogger(PgEventConsumer.class);
+ private final PgEventListener listener = new PgEventListener();
private final PgEventEndpoint endpoint;
private PGConnection dbConnection;
+ private ScheduledExecutorService reconnectPool;
+ private ExecutorService workerPool;
+ private boolean shutdownWorkerPool;
+ private BackOffTimer timer;
public PgEventConsumer(PgEventEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
}
+ public PgEventListener getPgEventListener() {
+ return listener;
+ }
+
@Override
protected void doStart() throws Exception {
super.doStart();
- dbConnection = endpoint.initJdbc();
- String sql = String.format("LISTEN %s", endpoint.getChannel());
- try (PreparedStatement statement = dbConnection.prepareStatement(sql))
{
- statement.execute();
+ if (endpoint.getWorkerPool() != null) {
+ workerPool = endpoint.getWorkerPool();
+ } else {
+ workerPool = endpoint.createWorkerPool();
+ shutdownWorkerPool = true;
}
- dbConnection.addNotificationListener(endpoint.getChannel(),
endpoint.getChannel(), this);
+ // used for re-connecting to the database
+ reconnectPool =
getEndpoint().getCamelContext().getExecutorServiceManager()
+ .newSingleThreadScheduledExecutor(this, "Reconnector");
+ timer = new BackOffTimer(reconnectPool);
+ listener.initConnection();
}
@Override
- public void notification(int processId, String channel, String payload) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Notification processId: {}, channel: {}, payload: {}",
processId, channel, payload);
+ protected void doStop() throws Exception {
+ super.doStop();
+ listener.closeConnection();
+
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(reconnectPool);
+ timer = null;
+ if (shutdownWorkerPool && workerPool != null) {
+ LOG.debug("Shutting down PgEventConsumer worker threads with
timeout {} millis", 10000);
+
endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(workerPool,
10000);
+ workerPool = null;
}
+ }
- Exchange exchange = createExchange(false);
- Message msg = exchange.getIn();
- msg.setHeader(PgEventConstants.HEADER_CHANNEL, channel);
- msg.setBody(payload);
+ public class PgEventListener implements PGNotificationListener {
- try {
- getProcessor().process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
- if (exchange.getException() != null) {
- String cause = "Unable to process incoming notification from
PostgreSQL: processId='" + processId + "', channel='"
- + channel + "', payload='" + payload + "'";
- getExceptionHandler().handleException(cause,
exchange.getException());
+ public void reconnect() {
+ BackOff bo =
BackOff.builder().delay(endpoint.getReconnectDelay()).build();
+ timer.schedule(bo, t -> {
+ LOG.debug("Connecting attempt #" + t.getCurrentAttempts());
+ try {
+ initConnection();
+ } catch (Exception e) {
+ return true;
+ }
+ LOG.debug("Connecting successful");
+ return false;
+ });
}
- releaseExchange(exchange, false);
- }
- @Override
- protected void doStop() throws Exception {
- if (dbConnection != null) {
- dbConnection.removeNotificationListener(endpoint.getChannel());
- String sql = String.format("UNLISTEN %s", endpoint.getChannel());
+ public void initConnection() throws Exception {
+ dbConnection = endpoint.initJdbc();
+ String sql = String.format("LISTEN %s", endpoint.getChannel());
try (PreparedStatement statement =
dbConnection.prepareStatement(sql)) {
statement.execute();
}
- dbConnection.close();
+ dbConnection.addNotificationListener(endpoint.getChannel(),
endpoint.getChannel(), listener);
+ }
+
+ public void closeConnection() throws Exception {
+ if (dbConnection != null) {
+ try {
+
dbConnection.removeNotificationListener(endpoint.getChannel());
+ String sql = String.format("UNLISTEN %s",
endpoint.getChannel());
+ try (PreparedStatement statement =
dbConnection.prepareStatement(sql)) {
+ statement.execute();
+ }
+ dbConnection.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ dbConnection = null;
+ }
+
+ @Override
+ public void notification(int processId, String channel, String
payload) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Notification processId: {}, channel: {}, payload:
{}", processId, channel, payload);
+ }
+
+ Exchange exchange = createExchange(false);
+ Message msg = exchange.getIn();
+ msg.setHeader(PgEventConstants.HEADER_CHANNEL, channel);
+ msg.setBody(payload);
+
+ // use worker pool to avoid blocking notification thread
+ if (workerPool != null) {
+ workerPool.submit(() -> {
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ if (exchange.getException() != null) {
+ String cause
+ = "Unable to process incoming notification
from PostgreSQL: processId='" + processId
+ + "', channel='"
+ + channel + "', payload='" + payload + "'";
+ getExceptionHandler().handleException(cause,
exchange.getException());
+ }
+ releaseExchange(exchange, false);
+ });
+ }
+ }
+
+ @Override
+ public void closed() {
+ // connection lost, so we need to re-connect
+ LOG.warn("Connection to PostgreSQL lost unexpected.
Re-connecting...");
+ reconnect();
}
}
+
}
diff --git
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
index ca506ffef4b..8e56d15a6b8 100644
---
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
+++
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.pgevent;
import java.sql.DriverManager;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import javax.sql.DataSource;
@@ -70,6 +71,14 @@ public class PgEventEndpoint extends DefaultEndpoint
implements EndpointServiceL
private String pass;
@UriParam(label = "advanced")
private DataSource datasource;
+ @UriParam(label = "consumer,advanced", defaultValue = "5000")
+ private int reconnectDelay = 5000;
+ @UriParam(label = "consumer,advanced")
+ private ExecutorService workerPool;
+ @UriParam(label = "consumer,advanced", defaultValue = "1")
+ private int workerPoolCoreSize = 1;
+ @UriParam(label = "consumer,advanced", defaultValue = "10")
+ private int workerPoolMaxSize = 10;
private final String uri;
@@ -186,6 +195,11 @@ public class PgEventEndpoint extends DefaultEndpoint
implements EndpointServiceL
return consumer;
}
+ ExecutorService createWorkerPool() {
+ return
getCamelContext().getExecutorServiceManager().newThreadPool(this,
+ "PgEventConsumer[" + channel + "]", workerPoolCoreSize,
workerPoolMaxSize);
+ }
+
public String getHost() {
return host;
}
@@ -263,4 +277,49 @@ public class PgEventEndpoint extends DefaultEndpoint
implements EndpointServiceL
public void setDatasource(DataSource datasource) {
this.datasource = datasource;
}
+
+ public int getReconnectDelay() {
+ return reconnectDelay;
+ }
+
+ /**
+ * When the consumer unexpected lose connection to the database, then this
specifies the interval (millis) between
+ * re-connection attempts to establish a new connection.
+ */
+ public void setReconnectDelay(int reconnectDelay) {
+ this.reconnectDelay = reconnectDelay;
+ }
+
+ public ExecutorService getWorkerPool() {
+ return workerPool;
+ }
+
+ /**
+ * To use a custom worker pool for processing the events from the database.
+ */
+ public void setWorkerPool(ExecutorService workerPool) {
+ this.workerPool = workerPool;
+ }
+
+ public int getWorkerPoolCoreSize() {
+ return workerPoolCoreSize;
+ }
+
+ /**
+ * Number of core threads in the worker pool for processing the events
from the database.
+ */
+ public void setWorkerPoolCoreSize(int workerPoolCoreSize) {
+ this.workerPoolCoreSize = workerPoolCoreSize;
+ }
+
+ public int getWorkerPoolMaxSize() {
+ return workerPoolMaxSize;
+ }
+
+ /**
+ * Maximum number of threads in the worker pool for processing the events
from the database.
+ */
+ public void setWorkerPoolMaxSize(int workerPoolMaxSize) {
+ this.workerPoolMaxSize = workerPoolMaxSize;
+ }
}
diff --git
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java
b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java
deleted file mode 100644
index 6fad950412e..00000000000
---
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.pgevent;
-
-import java.sql.PreparedStatement;
-
-import com.impossibl.postgres.api.jdbc.PGConnection;
-import com.impossibl.postgres.jdbc.PGDataSource;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeExtension;
-import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.Message;
-import org.apache.camel.Processor;
-import org.apache.camel.component.pgevent.PgEventConsumer;
-import org.apache.camel.component.pgevent.PgEventEndpoint;
-import org.apache.camel.spi.ExchangeFactory;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-public class PgEventConsumerTest {
-
- @Test
- public void testPgEventConsumerStart() throws Exception {
- PGDataSource dataSource = mock(PGDataSource.class);
- PGConnection connection = mock(PGConnection.class);
- PreparedStatement statement = mock(PreparedStatement.class);
- PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
- Processor processor = mock(Processor.class);
- CamelContext camelContext = mock(CamelContext.class);
- ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
- ExchangeFactory ef = mock(ExchangeFactory.class);
-
- when(endpoint.getCamelContext()).thenReturn(camelContext);
- when(camelContext.getCamelContextExtension()).thenReturn(ecc);
- when(ecc.getExchangeFactory()).thenReturn(ef);
- when(ef.newExchangeFactory(any())).thenReturn(ef);
- when(endpoint.initJdbc()).thenReturn(connection);
- when(connection.prepareStatement("LISTEN
camel")).thenReturn(statement);
- when(endpoint.getChannel()).thenReturn("camel");
-
- PgEventConsumer consumer = new PgEventConsumer(endpoint, processor);
- consumer.start();
-
- verify(connection).addNotificationListener("camel", "camel", consumer);
- assertTrue(consumer.isStarted());
- }
-
- @Test
- public void testPgEventConsumerStop() throws Exception {
- PGDataSource dataSource = mock(PGDataSource.class);
- PGConnection connection = mock(PGConnection.class);
- PreparedStatement statement = mock(PreparedStatement.class);
- PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
- Processor processor = mock(Processor.class);
- CamelContext camelContext = mock(CamelContext.class);
- ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
- ExchangeFactory ef = mock(ExchangeFactory.class);
-
- when(endpoint.getCamelContext()).thenReturn(camelContext);
- when(camelContext.getCamelContextExtension()).thenReturn(ecc);
- when(ecc.getExchangeFactory()).thenReturn(ef);
- when(ef.newExchangeFactory(any())).thenReturn(ef);
- when(endpoint.initJdbc()).thenReturn(connection);
- when(connection.prepareStatement("LISTEN
camel")).thenReturn(statement);
- when(connection.prepareStatement("LISTEN
camel")).thenReturn(statement);
- when(endpoint.getChannel()).thenReturn("camel");
- when(connection.prepareStatement("UNLISTEN
camel")).thenReturn(statement);
-
- PgEventConsumer consumer = new PgEventConsumer(endpoint, processor);
- consumer.start();
- consumer.stop();
-
- verify(connection).removeNotificationListener("camel");
- verify(connection).close();
- assertTrue(consumer.isStopped());
- }
-
- @Test
- public void testPgEventNotification() throws Exception {
- PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
- Processor processor = mock(Processor.class);
- Exchange exchange = mock(Exchange.class);
- ExchangeExtension exchangeExtension = mock(ExchangeExtension.class);
- Message message = mock(Message.class);
- CamelContext camelContext = mock(CamelContext.class);
- ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
- ExchangeFactory ef = mock(ExchangeFactory.class);
-
- when(endpoint.getCamelContext()).thenReturn(camelContext);
- when(camelContext.getCamelContextExtension()).thenReturn(ecc);
- when(ecc.getExchangeFactory()).thenReturn(ef);
- when(ef.newExchangeFactory(any())).thenReturn(ef);
- when(ef.create(endpoint, false)).thenReturn(exchange);
- when(exchange.getExchangeExtension()).thenReturn(exchangeExtension);
- when(exchange.getIn()).thenReturn(message);
-
- PgEventConsumer consumer = new PgEventConsumer(endpoint, processor);
- consumer.notification(1, "camel", "some event");
-
- verify(message).setHeader("channel", "camel");
- verify(message).setBody("some event");
- verify(processor).process(exchange);
- }
-}
diff --git
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventHelperTest.java
b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventHelperTest.java
deleted file mode 100644
index 3e1df152970..00000000000
---
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventHelperTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.pgevent;
-
-import java.sql.Connection;
-
-import com.impossibl.postgres.api.jdbc.PGConnection;
-import org.apache.camel.component.pgevent.PgEventHelper;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-public class PgEventHelperTest {
-
- @Test
- public void testToPGConnectionWithNullConnection() {
- assertThrows(IllegalArgumentException.class,
- () -> PgEventHelper.toPGConnection(null));
- }
-
- @Test
- public void testToPGConnectionWithNonWrappedConnection() throws Exception {
- Connection originalConnection = mock(PGConnection.class);
- PGConnection actualConnection =
PgEventHelper.toPGConnection(originalConnection);
- assertSame(originalConnection, actualConnection);
- }
-
- @Test
- public void testToPGConnectionWithWrappedConnection() throws Exception {
- Connection wrapperConnection = mock(Connection.class);
- PGConnection unwrappedConnection = mock(PGConnection.class);
-
when(wrapperConnection.isWrapperFor(PGConnection.class)).thenReturn(true);
-
when(wrapperConnection.unwrap(PGConnection.class)).thenReturn(unwrappedConnection);
- PGConnection actualConnection =
PgEventHelper.toPGConnection(wrapperConnection);
- assertSame(unwrappedConnection, actualConnection);
- }
-
- @Test
- public void testToPGConnectionWithInvalidWrappedConnection() throws
Exception {
- Connection wrapperConnection = mock(Connection.class);
-
when(wrapperConnection.isWrapperFor(PGConnection.class)).thenReturn(false);
- assertThrows(IllegalStateException.class,
- () -> PgEventHelper.toPGConnection(wrapperConnection));
- }
-}
diff --git
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventProducerTest.java
b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventProducerTest.java
deleted file mode 100644
index d5ad387f15a..00000000000
---
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventProducerTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.pgevent;
-
-import java.sql.CallableStatement;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import com.impossibl.postgres.api.jdbc.PGConnection;
-import com.impossibl.postgres.jdbc.PGDataSource;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.component.pgevent.InvalidStateException;
-import org.apache.camel.component.pgevent.PgEventEndpoint;
-import org.apache.camel.component.pgevent.PgEventProducer;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-public class PgEventProducerTest {
-
- private PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
- private PGDataSource dataSource = mock(PGDataSource.class);
- private PGConnection connection = mock(PGConnection.class);
- private PreparedStatement statement = mock(PreparedStatement.class);
- private Exchange exchange = mock(Exchange.class);
- private Message message = mock(Message.class);
-
- @Test
- public void testPgEventProducerStart() throws Exception {
- when(endpoint.getDatasource()).thenReturn(dataSource);
- when(dataSource.getConnection()).thenReturn(connection);
-
- PgEventProducer producer = new PgEventProducer(endpoint);
- producer.start();
-
- assertTrue(producer.isStarted());
- }
-
- @Test
- public void testPgEventProducerStop() throws Exception {
- when(endpoint.initJdbc()).thenReturn(connection);
-
- PgEventProducer producer = new PgEventProducer(endpoint);
- producer.start();
- producer.stop();
-
- verify(connection).close();
- assertTrue(producer.isStopped());
- }
-
- @Test
- public void testPgEventProducerProcessDbThrowsInvalidStateException()
throws Exception {
- when(endpoint.initJdbc()).thenReturn(connection);
- when(connection.isClosed()).thenThrow(new SQLException("DB problem
occurred"));
-
- PgEventProducer producer = new PgEventProducer(endpoint);
- producer.start();
- assertThrows(InvalidStateException.class,
- () -> producer.process(exchange));
- }
-
- @Test
- public void testPgEventProducerProcessDbConnectionClosed() throws
Exception {
- PGConnection connectionNew = mock(PGConnection.class);
-
- when(endpoint.initJdbc()).thenReturn(connection);
- when(endpoint.getDatasource()).thenReturn(dataSource);
- when(dataSource.getConnection()).thenReturn(connection, connectionNew);
- when(connection.isClosed()).thenReturn(true);
- when(exchange.getIn()).thenReturn(message);
- when(message.getBody(String.class)).thenReturn("pgevent");
- when(endpoint.getChannel()).thenReturn("camel");
-
when(connection.prepareStatement(ArgumentMatchers.anyString())).thenReturn(statement);
-
- PgEventProducer producer = new PgEventProducer(endpoint);
- producer.start();
- producer.process(exchange);
-
- verify(statement).execute();
- }
-
- @Test
- public void testPgEventProducerProcessServerMinimumVersionMatched() throws
Exception {
- CallableStatement statement = mock(CallableStatement.class);
-
- when(endpoint.initJdbc()).thenReturn(connection);
- when(endpoint.getDatasource()).thenReturn(dataSource);
- when(connection.isClosed()).thenReturn(false);
- when(dataSource.getConnection()).thenReturn(connection);
- when(exchange.getIn()).thenReturn(message);
- when(message.getBody(String.class)).thenReturn("pgevent");
- when(endpoint.getChannel()).thenReturn("camel");
- when(connection.isServerMinimumVersion(9, 0)).thenReturn(true);
-
when(connection.prepareCall(ArgumentMatchers.anyString())).thenReturn(statement);
-
- PgEventProducer producer = new PgEventProducer(endpoint);
- producer.start();
- producer.process(exchange);
-
- verify(statement).execute();
- }
-
- @Test
- public void testPgEventProducerProcessServerMinimumVersionNotMatched()
throws Exception {
- when(endpoint.initJdbc()).thenReturn(connection);
- when(endpoint.getDatasource()).thenReturn(dataSource);
- when(connection.isClosed()).thenReturn(false);
- when(dataSource.getConnection()).thenReturn(connection);
- when(exchange.getIn()).thenReturn(message);
- when(message.getBody(String.class)).thenReturn("pgevent");
- when(endpoint.getChannel()).thenReturn("camel");
- when(connection.isServerMinimumVersion(9, 0)).thenReturn(false);
- when(connection.prepareStatement("NOTIFY camel,
'pgevent'")).thenReturn(statement);
-
- PgEventProducer producer = new PgEventProducer(endpoint);
- producer.start();
- producer.process(exchange);
-
- verify(statement).execute();
- }
-}
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PgEventEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PgEventEndpointBuilderFactory.java
index 878f1ea7127..2fb4a8eb84e 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PgEventEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PgEventEndpointBuilderFactory.java
@@ -200,6 +200,136 @@ public interface PgEventEndpointBuilderFactory {
doSetProperty("exchangePattern", exchangePattern);
return this;
}
+ /**
+ * When the consumer unexpected lose connection to the database, then
+ * this specifies the interval (millis) between re-connection attempts
+ * to establish a new connection.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 5000
+ * Group: consumer (advanced)
+ *
+ * @param reconnectDelay the value to set
+ * @return the dsl builder
+ */
+ default AdvancedPgEventEndpointConsumerBuilder reconnectDelay(int
reconnectDelay) {
+ doSetProperty("reconnectDelay", reconnectDelay);
+ return this;
+ }
+ /**
+ * When the consumer unexpected lose connection to the database, then
+ * this specifies the interval (millis) between re-connection attempts
+ * to establish a new connection.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 5000
+ * Group: consumer (advanced)
+ *
+ * @param reconnectDelay the value to set
+ * @return the dsl builder
+ */
+ default AdvancedPgEventEndpointConsumerBuilder reconnectDelay(String
reconnectDelay) {
+ doSetProperty("reconnectDelay", reconnectDelay);
+ return this;
+ }
+ /**
+ * To use a custom worker pool for processing the events from the
+ * database.
+ *
+ * The option is a: <code>java.util.concurrent.ExecutorService</code>
+ * type.
+ *
+ * Group: consumer (advanced)
+ *
+ * @param workerPool the value to set
+ * @return the dsl builder
+ */
+ default AdvancedPgEventEndpointConsumerBuilder
workerPool(ExecutorService workerPool) {
+ doSetProperty("workerPool", workerPool);
+ return this;
+ }
+ /**
+ * To use a custom worker pool for processing the events from the
+ * database.
+ *
+ * The option will be converted to a
+ * <code>java.util.concurrent.ExecutorService</code> type.
+ *
+ * Group: consumer (advanced)
+ *
+ * @param workerPool the value to set
+ * @return the dsl builder
+ */
+ default AdvancedPgEventEndpointConsumerBuilder workerPool(String
workerPool) {
+ doSetProperty("workerPool", workerPool);
+ return this;
+ }
+ /**
+ * Number of core threads in the worker pool for processing the events
+ * from the database.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 1
+ * Group: consumer (advanced)
+ *
+ * @param workerPoolCoreSize the value to set
+ * @return the dsl builder
+ */
+ default AdvancedPgEventEndpointConsumerBuilder workerPoolCoreSize(int
workerPoolCoreSize) {
+ doSetProperty("workerPoolCoreSize", workerPoolCoreSize);
+ return this;
+ }
+ /**
+ * Number of core threads in the worker pool for processing the events
+ * from the database.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 1
+ * Group: consumer (advanced)
+ *
+ * @param workerPoolCoreSize the value to set
+ * @return the dsl builder
+ */
+ default AdvancedPgEventEndpointConsumerBuilder
workerPoolCoreSize(String workerPoolCoreSize) {
+ doSetProperty("workerPoolCoreSize", workerPoolCoreSize);
+ return this;
+ }
+ /**
+ * Maximum number of threads in the worker pool for processing the
+ * events from the database.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 10
+ * Group: consumer (advanced)
+ *
+ * @param workerPoolMaxSize the value to set
+ * @return the dsl builder
+ */
+ default AdvancedPgEventEndpointConsumerBuilder workerPoolMaxSize(int
workerPoolMaxSize) {
+ doSetProperty("workerPoolMaxSize", workerPoolMaxSize);
+ return this;
+ }
+ /**
+ * Maximum number of threads in the worker pool for processing the
+ * events from the database.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 10
+ * Group: consumer (advanced)
+ *
+ * @param workerPoolMaxSize the value to set
+ * @return the dsl builder
+ */
+ default AdvancedPgEventEndpointConsumerBuilder
workerPoolMaxSize(String workerPoolMaxSize) {
+ doSetProperty("workerPoolMaxSize", workerPoolMaxSize);
+ return this;
+ }
/**
* To connect using the given javax.sql.DataSource instead of using
* hostname and port.