This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 6d24ee8 CAMEL-17121: converted mongodb-gridfs to the repeatable tasks
(#6356)
6d24ee8 is described below
commit 6d24ee80f25df4d33ff02e4759e7be018dd5a0ee
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Sat Oct 30 07:32:01 2021 +0200
CAMEL-17121: converted mongodb-gridfs to the repeatable tasks (#6356)
* CAMEL-17121: added support for unlimited iterations
* CAMEL-17121: converted mongodb-gridfs to the repeatable tasks
---
.../component/mongodb/gridfs/GridFsConsumer.java | 179 ++++++++++++---------
.../task/budget/IterationBoundedBudget.java | 12 +-
2 files changed, 111 insertions(+), 80 deletions(-)
diff --git
a/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
b/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
index acfc588..fe7639e 100644
---
a/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
+++
b/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.mongodb.gridfs;
import java.io.InputStream;
+import java.time.Duration;
import java.util.Date;
import java.util.concurrent.ExecutorService;
@@ -32,7 +33,10 @@ import com.mongodb.client.model.Updates;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
-import org.apache.camel.util.IOHelper;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.support.task.budget.IterationBoundedBudget;
import org.bson.Document;
import org.bson.conversions.Bson;
@@ -71,7 +75,6 @@ public class GridFsConsumer extends DefaultConsumer
implements Runnable {
@Override
public void run() {
- MongoCursor<GridFSFile> cursor = null;
Date fromDate = null;
QueryStrategy queryStrategy = endpoint.getQueryStrategy();
@@ -103,92 +106,112 @@ public class GridFsConsumer extends DefaultConsumer
implements Runnable {
} else if (usesTimestamp) {
fromDate = new Date();
}
- try {
- Thread.sleep(endpoint.getInitialDelay());
- while (isStarted()) {
- if (cursor == null) {
- String queryString = endpoint.getQuery();
- Bson query = null;
- if (queryString != null) {
- query = Document.parse(queryString);
- }
- if (usesTimestamp) {
- Bson uploadDateFilter =
Filters.gt(GRIDFS_FILE_KEY_UPLOAD_DATE, fromDate);
- if (query == null) {
- query = uploadDateFilter;
- } else {
- query = Filters.and(query, uploadDateFilter);
- }
- }
- if (usesAttribute) {
- Bson fileAttributeNameFilter =
Filters.eq(endpoint.getFileAttributeName(), null);
- if (query == null) {
- query = fileAttributeNameFilter;
- } else {
- query = Filters.and(query,
fileAttributeNameFilter);
- }
- }
- cursor = endpoint.getGridFsBucket().find(query).cursor();
+
+ BlockingTask task = Tasks.foregroundTask()
+ .withBudget(Budgets.iterationBudget()
+
.withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS)
+ .withInterval(Duration.ofMillis(endpoint.getDelay()))
+
.withInitialDelay(Duration.ofMillis(endpoint.getInitialDelay()))
+ .build())
+ .build();
+
+ MongoCollection<Document> finalPtsCollection = ptsCollection;
+ Date finalFromDate = fromDate;
+ Document finalPersistentTimestamp = persistentTimestamp;
+ task.run(() -> processCollection(finalFromDate, usesTimestamp,
persistsTimestamp, usesAttribute, finalPtsCollection,
+ finalPersistentTimestamp));
+ }
+
+ private boolean processCollection(
+ Date fromDate, boolean usesTimestamp, boolean persistsTimestamp,
boolean usesAttribute,
+ final MongoCollection<Document> ptsCollection, final Document
persistentTimestamp) {
+
+ if (!isStarted()) {
+ return false;
+ }
+
+ try (MongoCursor<GridFSFile> cursor =
getGridFSFileMongoCursor(fromDate, usesTimestamp, usesAttribute)) {
+ boolean dateModified = false;
+
+ while (cursor.hasNext() && isStarted()) {
+ GridFSFile file = cursor.next();
+ GridFSFile fOrig = file;
+ if (usesAttribute) {
+ FindOneAndUpdateOptions options = new
FindOneAndUpdateOptions();
+ options.returnDocument(ReturnDocument.AFTER);
+ Bson filter = Filters.and(eq("_id", file.getId()),
eq(endpoint.getFileAttributeName(), null));
+ Bson update = Updates.set(endpoint.getFileAttributeName(),
GRIDFS_FILE_ATTRIBUTE_PROCESSING);
+ fOrig =
endpoint.getFilesCollection().findOneAndUpdate(filter, update, options);
}
- boolean dateModified = false;
- while (cursor.hasNext() && isStarted()) {
- GridFSFile file = cursor.next();
- GridFSFile forig = file;
- if (usesAttribute) {
- FindOneAndUpdateOptions options = new
FindOneAndUpdateOptions();
- options.returnDocument(ReturnDocument.AFTER);
- Bson filter = Filters.and(eq("_id", file.getId()),
eq(endpoint.getFileAttributeName(), null));
- Bson update =
Updates.set(endpoint.getFileAttributeName(), GRIDFS_FILE_ATTRIBUTE_PROCESSING);
- forig =
endpoint.getFilesCollection().findOneAndUpdate(filter, update, options);
- }
- if (forig != null) {
- Exchange exchange = createExchange(true);
- GridFSDownloadStream downloadStream =
endpoint.getGridFsBucket().openDownloadStream(file.getFilename());
- file = downloadStream.getGridFSFile();
-
- Document metadata = file.getMetadata();
- if (metadata != null) {
- String contentType =
metadata.get(GRIDFS_FILE_KEY_CONTENT_TYPE, String.class);
- if (contentType != null) {
-
exchange.getIn().setHeader(Exchange.FILE_CONTENT_TYPE, contentType);
- }
-
exchange.getIn().setHeader(GridFsEndpoint.GRIDFS_METADATA, metadata.toJson());
+ if (fOrig != null) {
+ Exchange exchange = createExchange(true);
+ GridFSDownloadStream downloadStream =
endpoint.getGridFsBucket().openDownloadStream(file.getFilename());
+ file = downloadStream.getGridFSFile();
+
+ Document metadata = file.getMetadata();
+ if (metadata != null) {
+ String contentType =
metadata.get(GRIDFS_FILE_KEY_CONTENT_TYPE, String.class);
+ if (contentType != null) {
+
exchange.getIn().setHeader(Exchange.FILE_CONTENT_TYPE, contentType);
}
+
exchange.getIn().setHeader(GridFsEndpoint.GRIDFS_METADATA, metadata.toJson());
+ }
- exchange.getIn().setHeader(Exchange.FILE_LENGTH,
file.getLength());
-
exchange.getIn().setHeader(Exchange.FILE_LAST_MODIFIED, file.getUploadDate());
- exchange.getIn().setBody(downloadStream,
InputStream.class);
- try {
- getProcessor().process(exchange);
- if (usesAttribute) {
- Bson update =
Updates.set(endpoint.getFileAttributeName(), GRIDFS_FILE_ATTRIBUTE_DONE);
-
endpoint.getFilesCollection().findOneAndUpdate(eq("_id", forig.getId()),
update);
- }
- if (usesTimestamp) {
- if (file.getUploadDate().compareTo(fromDate) >
0) {
- fromDate = file.getUploadDate();
- dateModified = true;
- }
- }
- } catch (Exception e) {
- // ignore
+ exchange.getIn().setHeader(Exchange.FILE_LENGTH,
file.getLength());
+ exchange.getIn().setHeader(Exchange.FILE_LAST_MODIFIED,
file.getUploadDate());
+ exchange.getIn().setBody(downloadStream,
InputStream.class);
+ try {
+ getProcessor().process(exchange);
+ if (usesAttribute) {
+ Bson update =
Updates.set(endpoint.getFileAttributeName(), GRIDFS_FILE_ATTRIBUTE_DONE);
+
endpoint.getFilesCollection().findOneAndUpdate(eq("_id", fOrig.getId()),
update);
+ }
+ if (usesTimestamp &&
file.getUploadDate().compareTo(fromDate) > 0) {
+ fromDate = file.getUploadDate();
+ dateModified = true;
}
+ } catch (Exception e) {
+ // ignore
}
}
+ }
- if (persistsTimestamp && dateModified) {
- Bson update = Updates.set(PERSISTENT_TIMESTAMP_KEY,
fromDate);
- ptsCollection.findOneAndUpdate(eq("_id",
persistentTimestamp.getObjectId("_id")), update);
- }
+ if (persistsTimestamp && dateModified) {
+ Bson update = Updates.set(PERSISTENT_TIMESTAMP_KEY, fromDate);
+ ptsCollection.findOneAndUpdate(eq("_id",
persistentTimestamp.getObjectId("_id")), update);
+ }
+ }
- cursor = null;
- Thread.sleep(endpoint.getDelay());
+ return false;
+ }
+
+ private MongoCursor<GridFSFile> getGridFSFileMongoCursor(Date fromDate,
boolean usesTimestamp, boolean usesAttribute) {
+ String queryString = endpoint.getQuery();
+ Bson query = getBsonDocument(fromDate, usesTimestamp, usesAttribute,
queryString);
+ return endpoint.getGridFsBucket().find(query).cursor();
+ }
+
+ private Bson getBsonDocument(Date fromDate, boolean usesTimestamp, boolean
usesAttribute, String queryString) {
+ Bson query = null;
+ if (queryString != null) {
+ query = Document.parse(queryString);
+ }
+ if (usesTimestamp) {
+ Bson uploadDateFilter = Filters.gt(GRIDFS_FILE_KEY_UPLOAD_DATE,
fromDate);
+ if (query == null) {
+ query = uploadDateFilter;
+ } else {
+ query = Filters.and(query, uploadDateFilter);
}
- } catch (Exception e1) {
- // ignore
}
- if (cursor != null) {
- IOHelper.close(cursor);
+ if (usesAttribute) {
+ Bson fileAttributeNameFilter =
Filters.eq(endpoint.getFileAttributeName(), null);
+ if (query == null) {
+ query = fileAttributeNameFilter;
+ } else {
+ query = Filters.and(query, fileAttributeNameFilter);
+ }
}
+ return query;
}
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java
b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java
index 86ccee6..1225846 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/IterationBoundedBudget.java
@@ -18,6 +18,8 @@
package org.apache.camel.support.task.budget;
public class IterationBoundedBudget implements IterationBudget {
+ public static final int UNLIMITED_ITERATIONS = -1;
+
private final long initialDelay;
private final long interval;
private final int maxIterations;
@@ -51,7 +53,9 @@ public class IterationBoundedBudget implements
IterationBudget {
@Override
public boolean next() {
if (canContinue()) {
- iterations++;
+ if (iterations != UNLIMITED_ITERATIONS) {
+ iterations++;
+ }
return true;
}
@@ -61,6 +65,10 @@ public class IterationBoundedBudget implements
IterationBudget {
@Override
public boolean canContinue() {
- return iterations < maxIterations;
+ if (maxIterations != UNLIMITED_ITERATIONS) {
+ return iterations < maxIterations;
+ }
+
+ return true;
}
}