This is an automated email from the ASF dual-hosted git repository.
fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 77b58e811ea CAMEL-20800: Use cloudant instead of LightCouch
77b58e811ea is described below
commit 77b58e811eadca9da3a08ab1eaf109041af3a06b
Author: Croway <[email protected]>
AuthorDate: Fri Jun 14 11:59:22 2024 +0200
CAMEL-20800: Use cloudant instead of LightCouch
---
.../apache/camel/catalog/components/couchdb.json | 17 +--
components/camel-couchdb/pom.xml | 13 +-
.../couchdb/CouchDbEndpointConfigurer.java | 6 +
.../couchdb/CouchDbEndpointUriFactory.java | 3 +-
.../apache/camel/component/couchdb/couchdb.json | 17 +--
.../component/couchdb/CouchDbChangesetTracker.java | 144 ---------------------
.../component/couchdb/CouchDbClientWrapper.java | 118 +++++++++++------
.../camel/component/couchdb/CouchDbConsumer.java | 78 +++++++----
.../camel/component/couchdb/CouchDbEndpoint.java | 41 ++++--
.../camel/component/couchdb/CouchDbProducer.java | 55 +++++---
.../couchdb/CouchDbChangesetTrackerTest.java | 122 -----------------
.../component/couchdb/CouchDbProducerTest.java | 54 +++++---
.../couchdb/integration/CouchDbCrudIT.java | 12 +-
.../dsl/CouchDbEndpointBuilderFactory.java | 36 ++++++
.../camel/kotlin/components/CouchdbUriDsl.kt | 18 +++
15 files changed, 324 insertions(+), 410 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json
index cc5807374a3..c7a44cc4fa1 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json
@@ -43,13 +43,14 @@
"createDatabase": { "index": 4, "kind": "parameter", "displayName":
"Create Database", "group": "common", "label": "", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Creates the database if
it does not already exist" },
"deletes": { "index": 5, "kind": "parameter", "displayName": "Deletes",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": true, "description": "Document deletes are published as
events" },
"heartbeat": { "index": 6, "kind": "parameter", "displayName":
"Heartbeat", "group": "consumer", "label": "consumer", "required": false,
"type": "duration", "javaType": "long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "30000", "description": "How often to
send an empty message to keep socket alive in millis" },
- "style": { "index": 7, "kind": "parameter", "displayName": "Style",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "all_docs", "main_only" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"main_only", "description": "Specifies how many revisions are returned in the
changes array. The default, main_only, will only return the current winning
revision; all_docs will return all leaf revi [...]
- "updates": { "index": 8, "kind": "parameter", "displayName": "Updates",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": true, "description": "Document inserts\/updates are
published as events" },
- "bridgeErrorHandler": { "index": 9, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
- "exceptionHandler": { "index": 10, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
- "exchangePattern": { "index": 11, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
- "lazyStartProducer": { "index": 12, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
- "password": { "index": 13, "kind": "parameter", "displayName": "Password",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Password for authenticated databases" },
- "username": { "index": 14, "kind": "parameter", "displayName": "Username",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Username in case of authenticated databases" }
+ "maxMessagesPerPoll": { "index": 7, "kind": "parameter", "displayName":
"Max Messages Per Poll", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "int", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": 10, "description": "Gets the maximum
number of messages as a limit to poll at each polling. Gets the maximum number
of messages as a limit to poll at each polling. The default value is 10. Use 0
or a negative number to [...]
+ "style": { "index": 8, "kind": "parameter", "displayName": "Style",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "all_docs", "main_only" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"main_only", "description": "Specifies how many revisions are returned in the
changes array. The default, main_only, will only return the current winning
revision; all_docs will return all leaf revi [...]
+ "updates": { "index": 9, "kind": "parameter", "displayName": "Updates",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": true, "description": "Document inserts\/updates are
published as events" },
+ "bridgeErrorHandler": { "index": 10, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
+ "exceptionHandler": { "index": 11, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
+ "exchangePattern": { "index": 12, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
+ "lazyStartProducer": { "index": 13, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "password": { "index": 14, "kind": "parameter", "displayName": "Password",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Password for authenticated databases" },
+ "username": { "index": 15, "kind": "parameter", "displayName": "Username",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Username in case of authenticated databases" }
}
}
diff --git a/components/camel-couchdb/pom.xml b/components/camel-couchdb/pom.xml
index 1b595ddfa6c..38843980de4 100644
--- a/components/camel-couchdb/pom.xml
+++ b/components/camel-couchdb/pom.xml
@@ -47,16 +47,9 @@
<artifactId>camel-core-processor</artifactId>
</dependency>
<dependency>
- <groupId>org.lightcouch</groupId>
- <artifactId>lightcouch</artifactId>
- <version>${lightcouch-version}</version>
- </dependency>
- <!-- prefer to use the httpclient version used by Camel components to
align
- the version -->
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>${httpclient4-version}</version>
+ <groupId>com.ibm.cloud</groupId>
+ <artifactId>cloudant</artifactId>
+ <version>0.8.6</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
diff --git
a/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointConfigurer.java
b/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointConfigurer.java
index 703779b71aa..1f2528ed587 100644
---
a/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointConfigurer.java
+++
b/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointConfigurer.java
@@ -35,6 +35,8 @@ public class CouchDbEndpointConfigurer extends
PropertyConfigurerSupport impleme
case "heartbeat": target.setHeartbeat(property(camelContext,
java.time.Duration.class, value).toMillis()); return true;
case "lazystartproducer":
case "lazyStartProducer":
target.setLazyStartProducer(property(camelContext, boolean.class, value));
return true;
+ case "maxmessagesperpoll":
+ case "maxMessagesPerPoll":
target.setMaxMessagesPerPoll(property(camelContext, int.class, value)); return
true;
case "password": target.setPassword(property(camelContext,
java.lang.String.class, value)); return true;
case "style": target.setStyle(property(camelContext,
java.lang.String.class, value)); return true;
case "updates": target.setUpdates(property(camelContext,
boolean.class, value)); return true;
@@ -58,6 +60,8 @@ public class CouchDbEndpointConfigurer extends
PropertyConfigurerSupport impleme
case "heartbeat": return long.class;
case "lazystartproducer":
case "lazyStartProducer": return boolean.class;
+ case "maxmessagesperpoll":
+ case "maxMessagesPerPoll": return int.class;
case "password": return java.lang.String.class;
case "style": return java.lang.String.class;
case "updates": return boolean.class;
@@ -82,6 +86,8 @@ public class CouchDbEndpointConfigurer extends
PropertyConfigurerSupport impleme
case "heartbeat": return target.getHeartbeat();
case "lazystartproducer":
case "lazyStartProducer": return target.isLazyStartProducer();
+ case "maxmessagesperpoll":
+ case "maxMessagesPerPoll": return target.getMaxMessagesPerPoll();
case "password": return target.getPassword();
case "style": return target.getStyle();
case "updates": return target.isUpdates();
diff --git
a/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointUriFactory.java
b/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointUriFactory.java
index ade397b722a..e0b194904f3 100644
---
a/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointUriFactory.java
+++
b/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointUriFactory.java
@@ -23,7 +23,7 @@ public class CouchDbEndpointUriFactory extends
org.apache.camel.support.componen
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(15);
+ Set<String> props = new HashSet<>(16);
props.add("bridgeErrorHandler");
props.add("createDatabase");
props.add("database");
@@ -33,6 +33,7 @@ public class CouchDbEndpointUriFactory extends
org.apache.camel.support.componen
props.add("heartbeat");
props.add("hostname");
props.add("lazyStartProducer");
+ props.add("maxMessagesPerPoll");
props.add("password");
props.add("port");
props.add("protocol");
diff --git
a/components/camel-couchdb/src/generated/resources/META-INF/org/apache/camel/component/couchdb/couchdb.json
b/components/camel-couchdb/src/generated/resources/META-INF/org/apache/camel/component/couchdb/couchdb.json
index cc5807374a3..c7a44cc4fa1 100644
---
a/components/camel-couchdb/src/generated/resources/META-INF/org/apache/camel/component/couchdb/couchdb.json
+++
b/components/camel-couchdb/src/generated/resources/META-INF/org/apache/camel/component/couchdb/couchdb.json
@@ -43,13 +43,14 @@
"createDatabase": { "index": 4, "kind": "parameter", "displayName":
"Create Database", "group": "common", "label": "", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Creates the database if
it does not already exist" },
"deletes": { "index": 5, "kind": "parameter", "displayName": "Deletes",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": true, "description": "Document deletes are published as
events" },
"heartbeat": { "index": 6, "kind": "parameter", "displayName":
"Heartbeat", "group": "consumer", "label": "consumer", "required": false,
"type": "duration", "javaType": "long", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "30000", "description": "How often to
send an empty message to keep socket alive in millis" },
- "style": { "index": 7, "kind": "parameter", "displayName": "Style",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "all_docs", "main_only" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"main_only", "description": "Specifies how many revisions are returned in the
changes array. The default, main_only, will only return the current winning
revision; all_docs will return all leaf revi [...]
- "updates": { "index": 8, "kind": "parameter", "displayName": "Updates",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": true, "description": "Document inserts\/updates are
published as events" },
- "bridgeErrorHandler": { "index": 9, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
- "exceptionHandler": { "index": 10, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
- "exchangePattern": { "index": 11, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
- "lazyStartProducer": { "index": 12, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
- "password": { "index": 13, "kind": "parameter", "displayName": "Password",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Password for authenticated databases" },
- "username": { "index": 14, "kind": "parameter", "displayName": "Username",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Username in case of authenticated databases" }
+ "maxMessagesPerPoll": { "index": 7, "kind": "parameter", "displayName":
"Max Messages Per Poll", "group": "consumer", "label": "consumer", "required":
false, "type": "integer", "javaType": "int", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": 10, "description": "Gets the maximum
number of messages as a limit to poll at each polling. Gets the maximum number
of messages as a limit to poll at each polling. The default value is 10. Use 0
or a negative number to [...]
+ "style": { "index": 8, "kind": "parameter", "displayName": "Style",
"group": "consumer", "label": "consumer", "required": false, "type": "string",
"javaType": "java.lang.String", "enum": [ "all_docs", "main_only" ],
"deprecated": false, "autowired": false, "secret": false, "defaultValue":
"main_only", "description": "Specifies how many revisions are returned in the
changes array. The default, main_only, will only return the current winning
revision; all_docs will return all leaf revi [...]
+ "updates": { "index": 9, "kind": "parameter", "displayName": "Updates",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": true, "description": "Document inserts\/updates are
published as events" },
+ "bridgeErrorHandler": { "index": 10, "kind": "parameter", "displayName":
"Bridge Error Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Allows for bridging the consumer to the
Camel routing Error Handler, which mean any exceptions (if possible) occurred
while the Camel consumer is trying to pickup incoming [...]
+ "exceptionHandler": { "index": 11, "kind": "parameter", "displayName":
"Exception Handler", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "autowired": false, "secret": false, "description": "To
let the consumer use a custom ExceptionHandler. Notice if the option
bridgeErrorHandler is enabled then this option is not in use. By de [...]
+ "exchangePattern": { "index": 12, "kind": "parameter", "displayName":
"Exchange Pattern", "group": "consumer (advanced)", "label":
"consumer,advanced", "required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ],
"deprecated": false, "autowired": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
+ "lazyStartProducer": { "index": 13, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
+ "password": { "index": 14, "kind": "parameter", "displayName": "Password",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Password for authenticated databases" },
+ "username": { "index": 15, "kind": "parameter", "displayName": "Username",
"group": "security", "label": "security", "required": false, "type": "string",
"javaType": "java.lang.String", "deprecated": false, "autowired": false,
"secret": true, "description": "Username in case of authenticated databases" }
}
}
diff --git
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
deleted file mode 100644
index 735e43f47ac..00000000000
---
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.couchdb;
-
-import java.time.Duration;
-
-import com.google.gson.JsonObject;
-import org.apache.camel.Exchange;
-import org.apache.camel.support.task.BlockingTask;
-import org.apache.camel.support.task.Tasks;
-import org.apache.camel.support.task.budget.Budgets;
-import org.lightcouch.Changes;
-import org.lightcouch.ChangesResult;
-import org.lightcouch.CouchDbException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CouchDbChangesetTracker implements Runnable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(CouchDbChangesetTracker.class);
- private static final int MAX_DB_ERROR_REPEATS = 8;
-
- private volatile boolean stopped;
- private final CouchDbClientWrapper couchClient;
- private final CouchDbEndpoint endpoint;
- private final CouchDbConsumer consumer;
- private Changes changes;
-
- public CouchDbChangesetTracker(CouchDbEndpoint endpoint, CouchDbConsumer
consumer, CouchDbClientWrapper couchClient) {
- this.endpoint = endpoint;
- this.consumer = consumer;
- this.couchClient = couchClient;
- }
-
- private void initChanges(final String sequence) {
- String since = sequence;
- if (null == since) {
- since = couchClient.getLatestUpdateSequence();
- }
- changes =
couchClient.changes().style(endpoint.getStyle()).includeDocs(true)
-
.since(since).heartBeat(endpoint.getHeartbeat()).continuousChanges();
- }
-
- @Override
- public void run() {
- String lastSequence = null;
- initChanges(null);
-
- try {
- while (!stopped) {
-
- try {
- while (changes.hasNext()) { // blocks until a feed is
received
- ChangesResult.Row feed = changes.next();
- if (feed.isDeleted() && !endpoint.isDeletes()) {
- continue;
- }
- if (!feed.isDeleted() && !endpoint.isUpdates()) {
- continue;
- }
-
- lastSequence = feed.getSeq();
- JsonObject doc = feed.getDoc();
-
- Exchange exchange =
consumer.createExchange(lastSequence, feed.getId(), doc, feed.isDeleted());
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Created exchange [exchange={}, _id={},
seq={}", exchange, feed.getId(), lastSequence);
- }
-
- try {
- consumer.getProcessor().process(exchange);
- } catch (Exception e) {
-
consumer.getExceptionHandler().handleException("Error processing exchange.",
exchange, e);
- } finally {
- consumer.releaseExchange(exchange, false);
- }
- }
-
- stopped = true;
-
- } catch (CouchDbException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("CouchDb Exception encountered waiting for
changes! Attempting to recover...", e);
- }
- if (endpoint.isRunAllowed() || !endpoint.isShutdown() ||
!consumer.isStopped()) {
- if (!waitForStability(lastSequence)) {
- throw e;
- }
- } else {
- LOG.debug("Skipping the stability check because
shutting down or running is not allowed at the moment");
- }
- }
- }
- } catch (Exception e) {
- LOG.error("Unexpected error causing CouchDb change tracker to
exit!", e);
- }
- }
-
- private boolean waitForStability(final String lastSequence) {
- BlockingTask task = Tasks.foregroundTask()
- .withBudget(Budgets.iterationBudget()
- .withMaxIterations(MAX_DB_ERROR_REPEATS)
- .withInterval(Duration.ofSeconds(3))
- .build())
- .withName("couchdb-wait-for-stability")
- .build();
-
- return task.run(this::stabilityCheck, lastSequence);
- }
-
- private boolean stabilityCheck(String lastSequence) {
- try {
- // Fail fast operation
- couchClient.context().serverVersion();
- // reset change listener
- initChanges(lastSequence);
-
- return true;
- } catch (Exception e) {
- LOG.debug("Failed to get CouchDb server version and/or reset
change listener", e);
- }
-
- return false;
- }
-
- public void stop() {
- changes.stop();
- }
-}
diff --git
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbClientWrapper.java
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbClientWrapper.java
index 7254d1eb417..e35f0fb1693 100644
---
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbClientWrapper.java
+++
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbClientWrapper.java
@@ -16,72 +16,112 @@
*/
package org.apache.camel.component.couchdb;
-import java.io.IOException;
-
-import com.google.gson.JsonObject;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.util.EntityUtils;
-import org.lightcouch.Changes;
-import org.lightcouch.CouchDbClient;
-import org.lightcouch.CouchDbContext;
-import org.lightcouch.CouchDbException;
-import org.lightcouch.Response;
+import com.ibm.cloud.cloudant.v1.Cloudant;
+import com.ibm.cloud.cloudant.v1.model.*;
+import com.ibm.cloud.sdk.core.http.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Necessary to allow mockito to mock this client. Once LightCouch library
adds an interface for the client, this class
- * can be removed.
+ * Necessary to allow mockito to mock this client.
*/
public class CouchDbClientWrapper {
+ private static final Logger log =
LoggerFactory.getLogger(CouchDbEndpoint.class);
- private final CouchDbClient client;
+ private final Cloudant client;
+ private final String dbName;
- public CouchDbClientWrapper(CouchDbClient client) {
+ public CouchDbClientWrapper(Cloudant client, String dbName, boolean
createDatabase) {
this.client = client;
+ this.dbName = dbName;
+
+ initDatabase(createDatabase);
}
- public Response update(Object doc) {
- return client.update(doc);
+ public void initDatabase(boolean createDatabase) {
+ if (createDatabase) {
+ boolean alreadyCreated = false;
+ for (String db : client.getAllDbs().execute().getResult()) {
+ if (db.equals(dbName)) {
+ alreadyCreated = true;
+ break;
+ }
+ }
+
+ if (!alreadyCreated) {
+ PutDatabaseOptions putDatabaseOptions = new
PutDatabaseOptions.Builder()
+ .db(dbName)
+ .build();
+
+ client.putDatabase(putDatabaseOptions).execute();
+ }
+
+ log.debug("Database {} created", dbName);
+ }
}
- public Response save(Object doc) {
- return client.save(doc);
+ public Response<DocumentResult> update(Document doc) {
+ PostDocumentOptions postDocumentOptions = new
PostDocumentOptions.Builder()
+ .document(doc)
+ .db(dbName)
+ .build();
+
+ return client.postDocument(postDocumentOptions).execute();
}
- public Response remove(Object doc) {
- return client.remove(doc);
+ public Response save(Document doc) {
+ PutDocumentOptions putDocumentOptions = new
PutDocumentOptions.Builder()
+ .document(doc)
+ .docId(doc.getId())
+ .db(dbName)
+ .build();
+
+ return client.putDocument(putDocumentOptions).execute();
}
- public Changes changes() {
- return client.changes();
+ public Response removeByIdAndRev(String id, String rev) {
+ DeleteDocumentOptions deleteDocumentOptions = new
DeleteDocumentOptions.Builder()
+ .docId(id)
+ .rev(rev)
+ .db(dbName)
+ .build();
+
+ return client.deleteDocument(deleteDocumentOptions).execute();
}
- public Object get(String id) {
- return client.find(id);
+ public Response<ChangesResult> pollChanges(String style, String since,
long heartBeat, long maxMessagesPerPoll) {
+ PostChangesOptions postChangesOptions = new
PostChangesOptions.Builder()
+ .db(dbName)
+ .since(since)
+ .limit(maxMessagesPerPoll)
+ .build();
+
+ return client.postChanges(postChangesOptions).execute();
}
- public CouchDbContext context() {
- return client.context();
+ public Response get(String id) {
+ GetDocumentOptions getDocumentOptions = new
GetDocumentOptions.Builder()
+ .docId(id)
+ .db(dbName)
+ .build();
+
+ return client.getDocument(getDocumentOptions).execute();
}
/**
* In CouchDB 2.3.x, the purge_seq field type was changed from number to
string. As such, calling
- * {@link org.lightcouch.CouchDbContext#info()} was throwing an exception.
This method workarounds the issue by
- * getting the update_seq field while ignoring the purge_seq field.
+ * {@link CouchDbContext#info()} was throwing an exception. This method
workarounds the issue by getting the
+ * update_seq field while ignoring the purge_seq field.
*
* @return The latest update sequence
*/
public String getLatestUpdateSequence() {
- HttpGet getDbInfoRequest = new HttpGet(client.getDBUri());
- try {
- HttpResponse getDbInfoResponse =
client.executeRequest(getDbInfoRequest);
- String dbInfoString =
EntityUtils.toString(getDbInfoResponse.getEntity());
- JsonObject dbInfo = client.getGson().fromJson(dbInfoString,
JsonObject.class);
- return dbInfo.get("update_seq").getAsString();
- } catch (IOException e) {
- getDbInfoRequest.abort();
- throw new CouchDbException("Error executing request to fetch the
latest update sequence. ", e);
- }
+ GetDatabaseInformationOptions getDatabaseInformationOptions = new
GetDatabaseInformationOptions.Builder()
+ .db(dbName)
+ .build();
+
+ return
client.getDatabaseInformation(getDatabaseInformationOptions).execute()
+ .getResult().getUpdateSeq();
}
}
diff --git
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
index 5ce86edd3dc..5a78470a087 100644
---
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
+++
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
@@ -16,30 +16,40 @@
*/
package org.apache.camel.component.couchdb;
+import java.util.Queue;
import java.util.concurrent.ExecutorService;
-import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.ibm.cloud.cloudant.v1.model.ChangesResult;
+import com.ibm.cloud.cloudant.v1.model.ChangesResultItem;
+import com.ibm.cloud.sdk.core.http.Response;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
-import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.support.resume.ResumeStrategyHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static
org.apache.camel.component.couchdb.CouchDbConstants.COUCHDB_RESUME_ACTION;
-public class CouchDbConsumer extends DefaultConsumer implements
ResumeAware<ResumeStrategy> {
+public class CouchDbConsumer extends ScheduledBatchPollingConsumer implements
ResumeAware<ResumeStrategy> {
+ private static final Logger LOG =
LoggerFactory.getLogger(CouchDbConsumer.class);
private final CouchDbClientWrapper couchClient;
private final CouchDbEndpoint endpoint;
private ExecutorService executor;
- private CouchDbChangesetTracker task;
private ResumeStrategy resumeStrategy;
+ private String since;
+ private String lastSequence = null;
public CouchDbConsumer(CouchDbEndpoint endpoint, CouchDbClientWrapper
couchClient, Processor processor) {
super(endpoint, processor);
this.couchClient = couchClient;
this.endpoint = endpoint;
+
+ since = couchClient.getLatestUpdateSequence();
}
@Override
@@ -52,40 +62,64 @@ public class CouchDbConsumer extends DefaultConsumer
implements ResumeAware<Resu
return resumeStrategy;
}
- public Exchange createExchange(String seq, String id, JsonObject obj,
boolean deleted) {
+ public Exchange createExchange(String seq, String id, ChangesResultItem
changesResultItem, boolean deleted) {
Exchange exchange = createExchange(false);
exchange.getIn().setHeader(CouchDbConstants.HEADER_DATABASE,
endpoint.getDatabase());
exchange.getIn().setHeader(CouchDbConstants.HEADER_SEQ, seq);
exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID, id);
- exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV,
obj.get("_rev").getAsString());
exchange.getIn().setHeader(CouchDbConstants.HEADER_METHOD, deleted ?
"DELETE" : "UPDATE");
- exchange.getIn().setBody(obj);
+ exchange.getIn().setBody(new
JsonParser().parseString(changesResultItem.toString()));
return exchange;
}
+ @Override
+ protected int poll() throws Exception {
+ Response<ChangesResult> changesResultResponse
+ = couchClient.pollChanges(endpoint.getStyle(), since,
endpoint.getHeartbeat(), getMaxMessagesPerPoll());
+
+ for (ChangesResultItem changesResultItem :
changesResultResponse.getResult().getResults()) {
+ if (changesResultItem.isDeleted() != null) {
+ if (changesResultItem.isDeleted() && !endpoint.isDeletes()) {
+ continue;
+ }
+ if (!changesResultItem.isDeleted() && !endpoint.isUpdates()) {
+ continue;
+ }
+ }
+
+ lastSequence = changesResultItem.getSeq();
+
+ Exchange exchange = this.createExchange(lastSequence,
changesResultItem.getId(), changesResultItem,
+ changesResultItem.isDeleted() == null ? false :
changesResultItem.isDeleted());
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Created exchange [exchange={}, _id={}, seq={}",
exchange, changesResultItem.getId(), lastSequence);
+ }
+
+ try {
+ this.getProcessor().process(exchange);
+ } catch (Exception e) {
+ this.getExceptionHandler().handleException("Error processing
exchange.", exchange, e);
+ } finally {
+ // Update since with latest seq, the messages are ordered
+ since = changesResultItem.getSeq();
+ this.releaseExchange(exchange, false);
+ }
+ }
+
+ return changesResultResponse.getResult().getResults().size();
+ }
+
@Override
protected void doStart() throws Exception {
ResumeStrategyHelper.resume(getEndpoint().getCamelContext(), this,
resumeStrategy, COUCHDB_RESUME_ACTION);
super.doStart();
- executor =
endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
endpoint.getEndpointUri(),
- 1);
- task = new CouchDbChangesetTracker(endpoint, this, couchClient);
- executor.submit(task);
-
}
@Override
- protected void doStop() throws Exception {
- super.doStop();
- if (task != null) {
- task.stop();
- }
- if (executor != null) {
-
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
- executor = null;
- }
+ public int processBatch(Queue<Object> exchanges) throws Exception {
+ return 0;
}
-
}
diff --git
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java
index f80c8bc9bd0..3e20b723169 100644
---
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java
+++
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java
@@ -19,17 +19,16 @@ package org.apache.camel.component.couchdb;
import java.net.URI;
import java.util.Map;
+import com.ibm.cloud.cloudant.v1.Cloudant;
+import com.ibm.cloud.sdk.core.security.Authenticator;
+import com.ibm.cloud.sdk.core.security.BasicAuthenticator;
+import com.ibm.cloud.sdk.core.security.NoAuthAuthenticator;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.spi.EndpointServiceLocation;
-import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.UriEndpoint;
-import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriPath;
+import org.apache.camel.spi.*;
import org.apache.camel.support.DefaultEndpoint;
-import org.lightcouch.CouchDbClient;
/**
* Consume changesets for inserts, updates and deletes in a CouchDB database,
as well as get, save, update and delete
@@ -71,6 +70,8 @@ public class CouchDbEndpoint extends DefaultEndpoint
implements EndpointServiceL
private boolean deletes = true;
@UriParam(label = "consumer", defaultValue = "true")
private boolean updates = true;
+ @UriParam(label = "consumer", defaultValue = "10")
+ private int maxMessagesPerPoll = 10;
public CouchDbEndpoint() {
}
@@ -119,6 +120,7 @@ public class CouchDbEndpoint extends DefaultEndpoint
implements EndpointServiceL
@Override
public Consumer createConsumer(Processor processor) throws Exception {
CouchDbConsumer answer = new CouchDbConsumer(this, createClient(),
processor);
+ answer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
configureConsumer(answer);
return answer;
}
@@ -129,8 +131,17 @@ public class CouchDbEndpoint extends DefaultEndpoint
implements EndpointServiceL
}
protected CouchDbClientWrapper createClient() {
- return new CouchDbClientWrapper(
- new CouchDbClient(database, createDatabase, protocol,
hostname, port, username, password));
+ Authenticator authenticator;
+ if (username == null) {
+ authenticator = new NoAuthAuthenticator();
+ } else {
+ authenticator = new BasicAuthenticator(username, password);
+ }
+
+ Cloudant cloudant = new Cloudant("camel-couchdb", authenticator);
+ cloudant.setServiceUrl(getServiceUrl());
+
+ return new CouchDbClientWrapper(cloudant, database, createDatabase);
}
public String getProtocol() {
@@ -255,4 +266,18 @@ public class CouchDbEndpoint extends DefaultEndpoint
implements EndpointServiceL
public void setUpdates(boolean updates) {
this.updates = updates;
}
+
+ public int getMaxMessagesPerPoll() {
+ return maxMessagesPerPoll;
+ }
+
+ /**
+ * Gets the maximum number of messages as a limit to poll at each polling.
+ * <p/>
+ * Gets the maximum number of messages as a limit to poll at each polling.
The default value is 10. Use 0 or a
+ * negative number to set it as unlimited.
+ */
+ public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+ this.maxMessagesPerPoll = maxMessagesPerPoll;
+ }
}
diff --git
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbProducer.java
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbProducer.java
index f50784bd773..914f7d605bd 100644
---
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbProducer.java
+++
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbProducer.java
@@ -16,15 +16,19 @@
*/
package org.apache.camel.component.couchdb;
+import java.util.UUID;
+
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
+import com.ibm.cloud.cloudant.v1.model.Document;
+import com.ibm.cloud.cloudant.v1.model.DocumentResult;
+import com.ibm.cloud.sdk.core.http.Response;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
-import org.lightcouch.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,28 +48,28 @@ public class CouchDbProducer extends DefaultProducer {
JsonElement json = getBodyAsJsonElement(exchange);
String operation =
exchange.getIn().getHeader(CouchDbConstants.HEADER_METHOD, String.class);
if (ObjectHelper.isEmpty(operation)) {
- Response save = saveJsonElement(json);
+ Response<DocumentResult> save = saveJsonElement(json);
if (save == null) {
throw new CouchDbException("Could not save document [unknown
reason]", exchange);
}
if (LOG.isTraceEnabled()) {
- LOG.trace("Document saved [_id={}, _rev={}]", save.getId(),
save.getRev());
+ LOG.trace("Document saved [_id={}, _rev={}]",
save.getResult().getId(), save.getResult().getRev());
}
- exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV,
save.getRev());
- exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID,
save.getId());
+ exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV,
save.getResult().getRev());
+ exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID,
save.getResult().getId());
} else {
if
(operation.equalsIgnoreCase(CouchDbOperations.DELETE.toString())) {
- Response delete = deleteJsonElement(json);
+ Response<DocumentResult> delete = deleteJsonElement(json);
if (delete == null) {
throw new CouchDbException("Could not delete document
[unknown reason]", exchange);
}
if (LOG.isTraceEnabled()) {
- LOG.trace("Document saved [_id={}, _rev={}]",
delete.getId(), delete.getRev());
+ LOG.trace("Document saved [_id={}, _rev={}]",
delete.getResult().getId(), delete.getResult().getRev());
}
- exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV,
delete.getRev());
- exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID,
delete.getId());
+ exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV,
delete.getResult().getRev());
+ exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID,
delete.getResult().getId());
}
if (operation.equalsIgnoreCase(CouchDbOperations.GET.toString())) {
String docId =
exchange.getIn().getHeader(CouchDbConstants.HEADER_DOC_ID, String.class);
@@ -93,23 +97,37 @@ public class CouchDbProducer extends DefaultProducer {
}
} else if (body instanceof JsonElement) {
return (JsonElement) body;
+ } else if (body instanceof Document document) {
+ return new JsonParser().parse(document.toString());
} else {
throw new InvalidPayloadException(exchange, body != null ?
body.getClass() : null);
}
}
- private Response saveJsonElement(JsonElement json) {
- Response save;
+ private Response<DocumentResult> saveJsonElement(JsonElement json) {
+ Response save = null;
if (json instanceof JsonObject) {
JsonObject obj = (JsonObject) json;
+ Document.Builder documentBuilder = new Document.Builder();
+ for (String key : obj.keySet()) {
+ if (key.equals("_id")) {
+ documentBuilder.id(obj.get(key).getAsString());
+ } else {
+ documentBuilder.add(key, obj.get(key));
+ }
+ }
+
+ Document document = documentBuilder.build();
+ if (document.getId() == null) {
+ document.setId(UUID.randomUUID().toString());
+ }
if (obj.get("_rev") == null) {
- save = couchClient.save(json);
+ save = couchClient.save(document);
} else {
- save = couchClient.update(json);
+ save = couchClient.update(document);
}
- } else {
- save = couchClient.save(json);
}
+
return save;
}
@@ -117,16 +135,17 @@ public class CouchDbProducer extends DefaultProducer {
Response delete;
if (json instanceof JsonObject) {
JsonObject obj = (JsonObject) json;
- delete = couchClient.remove(obj);
+ delete =
couchClient.removeByIdAndRev(obj.get("_id").getAsString(),
obj.get("_rev").getAsString());
} else {
- delete = couchClient.remove(json);
+ delete =
couchClient.removeByIdAndRev(json.getAsJsonObject().get("_id").getAsString(),
+ json.getAsJsonObject().get("_rev").getAsString());
}
return delete;
}
private Object getElement(String id) {
Object response;
- response = couchClient.get(id);
+ response = couchClient.get(id).getResult();
return response;
}
}
diff --git
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java
deleted file mode 100644
index 7ae1c4999e0..00000000000
---
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.couchdb;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.lightcouch.Changes;
-import org.lightcouch.ChangesResult.Row;
-import org.lightcouch.CouchDbContext;
-import org.lightcouch.CouchDbInfo;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-public class CouchDbChangesetTrackerTest {
-
- @Mock
- private Changes changes;
- @Mock
- private CouchDbClientWrapper client;
- @Mock
- private CouchDbEndpoint endpoint;
- @Mock
- private CouchDbConsumer consumer;
- @Mock
- private CouchDbContext context;
- @Mock
- private CouchDbInfo info;
- @Mock
- private Row row3;
- @Mock
- private Row row2;
- @Mock
- private Row row1;
- @Mock
- private Exchange exchange1;
- @Mock
- private Exchange exchange2;
- @Mock
- private Exchange exchange3;
- @Mock
- private Processor processor;
-
- private CouchDbChangesetTracker tracker;
-
- @BeforeEach
- public void before() {
- when(endpoint.isUpdates()).thenReturn(true);
-
- when(client.getLatestUpdateSequence()).thenReturn("100");
- when(client.changes()).thenReturn(changes);
- when(changes.continuousChanges()).thenReturn(changes);
- when(changes.includeDocs(true)).thenReturn(changes);
- when(changes.since(anyString())).thenReturn(changes);
- when(changes.heartBeat(anyLong())).thenReturn(changes);
- when(changes.style(ArgumentMatchers.isNull())).thenReturn(changes);
-
- when(row1.getSeq()).thenReturn("seq1");
-
- when(row1.getId()).thenReturn("id1");
-
- tracker = new CouchDbChangesetTracker(endpoint, consumer, client);
- }
-
- @Test
- void testExchangeCreatedWithCorrectProperties() throws Exception {
- when(row2.getSeq()).thenReturn("seq2");
- when(row3.getSeq()).thenReturn("seq3");
- when(row2.getId()).thenReturn("id2");
- when(row3.getId()).thenReturn("id3");
- when(changes.hasNext()).thenReturn(true, true, true, false);
- when(changes.next()).thenReturn(row1, row2, row3);
- when(consumer.createExchange("seq1", "id1", null,
false)).thenReturn(exchange1);
- when(consumer.createExchange("seq2", "id2", null,
false)).thenReturn(exchange2);
- when(consumer.createExchange("seq3", "id3", null,
false)).thenReturn(exchange3);
- when(consumer.getProcessor()).thenReturn(processor);
-
- tracker.run();
-
- verify(consumer).createExchange("seq1", "id1", null, false);
- verify(processor).process(exchange1);
- verify(consumer).createExchange("seq2", "id2", null, false);
- verify(processor).process(exchange2);
- verify(consumer).createExchange("seq3", "id3", null, false);
- verify(processor).process(exchange3);
- }
-
- @Test
- void testProcessorInvoked() throws Exception {
- when(changes.hasNext()).thenReturn(true, false);
- when(changes.next()).thenReturn(row1);
- when(consumer.getProcessor()).thenReturn(processor);
-
- tracker.run();
-
- verify(consumer).createExchange("seq1", "id1", null, false);
- verify(processor).process(ArgumentMatchers.isNull());
- }
-}
diff --git
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbProducerTest.java
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbProducerTest.java
index 4e9ffea65eb..593fd6c85c6 100644
---
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbProducerTest.java
+++
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbProducerTest.java
@@ -18,15 +18,17 @@ package org.apache.camel.component.couchdb;
import java.util.UUID;
-import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import com.ibm.cloud.cloudant.v1.model.Document;
+import com.ibm.cloud.cloudant.v1.model.DocumentResult;
+import com.ibm.cloud.sdk.core.http.Response;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.Message;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.lightcouch.Response;
+import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -35,8 +37,7 @@ import org.mockito.stubbing.Answer;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
public class CouchDbProducerTest {
@@ -53,8 +54,8 @@ public class CouchDbProducerTest {
@Mock
private Message msg;
- @Mock
- private Response response;
+ @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+ private Response<DocumentResult> response;
private CouchDbProducer producer;
@@ -77,14 +78,17 @@ public class CouchDbProducerTest {
String id = UUID.randomUUID().toString();
String rev = UUID.randomUUID().toString();
- JsonObject doc = new JsonObject();
- doc.addProperty("_id", id);
- doc.addProperty("_rev", rev);
+ Document doc = new Document.Builder()
+ .add("_rev", rev)
+ .id(id)
+ .build();
+ DocumentResult documentResult = mock(DocumentResult.class,
Answers.RETURNS_DEEP_STUBS);
when(msg.getMandatoryBody()).thenReturn(doc);
when(client.update(doc)).thenReturn(response);
- when(response.getId()).thenReturn(id);
- when(response.getRev()).thenReturn(rev);
+ when(response.getResult()).thenReturn(documentResult);
+ when(response.getResult().getId()).thenReturn(id);
+ when(response.getResult().getRev()).thenReturn(rev);
producer.process(exchange);
verify(msg).setHeader(CouchDbConstants.HEADER_DOC_ID, id);
@@ -104,15 +108,18 @@ public class CouchDbProducerTest {
String id = UUID.randomUUID().toString();
String rev = UUID.randomUUID().toString();
- JsonObject doc = new JsonObject();
- doc.addProperty("_id", id);
- doc.addProperty("_rev", rev);
+ Document doc = new Document.Builder()
+ .id(id)
+ .add("_rev", rev)
+ .build();
+ DocumentResult documentResult = mock(DocumentResult.class,
Answers.RETURNS_DEEP_STUBS);
when(msg.getHeader(CouchDbConstants.HEADER_METHOD,
String.class)).thenReturn("DELETE");
when(msg.getMandatoryBody()).thenReturn(doc);
- when(client.remove(doc)).thenReturn(response);
- when(response.getId()).thenReturn(id);
- when(response.getRev()).thenReturn(rev);
+ when(client.removeByIdAndRev(id, rev)).thenReturn(response);
+ when(response.getResult()).thenReturn(documentResult);
+ when(response.getResult().getId()).thenReturn(id);
+ when(response.getResult().getRev()).thenReturn(rev);
producer.process(exchange);
verify(msg).setHeader(CouchDbConstants.HEADER_DOC_ID, id);
@@ -142,12 +149,17 @@ public class CouchDbProducerTest {
@Override
public Response answer(InvocationOnMock invocation) {
- assertTrue(invocation.getArguments()[0] instanceof JsonElement,
- invocation.getArguments()[0].getClass() + " but wanted
" + JsonElement.class);
- return new Response();
+ assertTrue(invocation.getArguments()[0] instanceof Document,
+ invocation.getArguments()[0].getClass() + " but wanted
" + Document.class);
+
+ DocumentResult documentResult = mock(DocumentResult.class);
+ Response response = mock(Response.class);
+ when(response.getResult()).thenReturn(documentResult);
+
+ return response;
}
});
producer.process(exchange);
- verify(client).save(any(JsonObject.class));
+ verify(client).save(any(Document.class));
}
}
diff --git
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/integration/CouchDbCrudIT.java
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/integration/CouchDbCrudIT.java
index 9307f2ec34a..0f45bbcd4d6 100644
---
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/integration/CouchDbCrudIT.java
+++
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/integration/CouchDbCrudIT.java
@@ -29,20 +29,13 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.couchdb.CouchDbConstants;
import org.apache.camel.component.couchdb.CouchDbOperations;
import org.apache.camel.component.mock.MockEndpoint;
-import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.*;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
-import org.junit.jupiter.api.Nested;
-import org.junit.jupiter.api.Order;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
-import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.*;
@TestInstance(Lifecycle.PER_CLASS)
public class CouchDbCrudIT extends CouchDbTestSupport {
@@ -103,6 +96,7 @@ public class CouchDbCrudIT extends CouchDbTestSupport {
// Creating a document should trigger an update notification
mockUpdateNotifications.expectedHeaderReceived(CouchDbConstants.HEADER_METHOD,
"UPDATE");
mockUpdateNotifications.expectedMessageCount(1);
+
createExchange = template.request(couchDbIn, e ->
e.getMessage().setBody(testDocument));
assertNotNull(getDocumentId(createExchange));
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchDbEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchDbEndpointBuilderFactory.java
index 2084b5d278f..8d68ecdab61 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchDbEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchDbEndpointBuilderFactory.java
@@ -134,6 +134,42 @@ public interface CouchDbEndpointBuilderFactory {
doSetProperty("heartbeat", heartbeat);
return this;
}
+ /**
+ * Gets the maximum number of messages as a limit to poll at each
+ * polling. Gets the maximum number of messages as a limit to poll at
+ * each polling. The default value is 10. Use 0 or a negative number to
+ * set it as unlimited.
+ *
+ * The option is a: <code>int</code> type.
+ *
+ * Default: 10
+ * Group: consumer
+ *
+ * @param maxMessagesPerPoll the value to set
+ * @return the dsl builder
+ */
+ default CouchDbEndpointConsumerBuilder maxMessagesPerPoll(int
maxMessagesPerPoll) {
+ doSetProperty("maxMessagesPerPoll", maxMessagesPerPoll);
+ return this;
+ }
+ /**
+ * Gets the maximum number of messages as a limit to poll at each
+ * polling. Gets the maximum number of messages as a limit to poll at
+ * each polling. The default value is 10. Use 0 or a negative number to
+ * set it as unlimited.
+ *
+ * The option will be converted to a <code>int</code> type.
+ *
+ * Default: 10
+ * Group: consumer
+ *
+ * @param maxMessagesPerPoll the value to set
+ * @return the dsl builder
+ */
+ default CouchDbEndpointConsumerBuilder maxMessagesPerPoll(String
maxMessagesPerPoll) {
+ doSetProperty("maxMessagesPerPoll", maxMessagesPerPoll);
+ return this;
+ }
/**
* Specifies how many revisions are returned in the changes array. The
* default, main_only, will only return the current winning revision;
diff --git
a/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/CouchdbUriDsl.kt
b/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/CouchdbUriDsl.kt
index c85f495ac98..339623f21db 100644
---
a/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/CouchdbUriDsl.kt
+++
b/dsl/camel-kotlin-api/src/generated/kotlin/org/apache/camel/kotlin/components/CouchdbUriDsl.kt
@@ -125,6 +125,24 @@ public class CouchdbUriDsl(
it.property("heartbeat", heartbeat)
}
+ /**
+ * Gets the maximum number of messages as a limit to poll at each polling.
Gets the maximum number
+ * of messages as a limit to poll at each polling. The default value is 10.
Use 0 or a negative
+ * number to set it as unlimited.
+ */
+ public fun maxMessagesPerPoll(maxMessagesPerPoll: String) {
+ it.property("maxMessagesPerPoll", maxMessagesPerPoll)
+ }
+
+ /**
+ * Gets the maximum number of messages as a limit to poll at each polling.
Gets the maximum number
+ * of messages as a limit to poll at each polling. The default value is 10.
Use 0 or a negative
+ * number to set it as unlimited.
+ */
+ public fun maxMessagesPerPoll(maxMessagesPerPoll: Int) {
+ it.property("maxMessagesPerPoll", maxMessagesPerPoll.toString())
+ }
+
/**
* Specifies how many revisions are returned in the changes array. The
default, main_only, will
* only return the current winning revision; all_docs will return all leaf
revisions (including