This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 332cf7c CAMEL-16750: camel-file - file idempotent read-locks should
be more roboust if acquring read-lock fails due to an exception such as when
using a shared datbase and there is a duplicate key violation as another node
has the lock.
332cf7c is described below
commit 332cf7cb8a2de90096ff858247c60c0644f5507d
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Jun 24 09:25:27 2021 +0200
CAMEL-16750: camel-file - file idempotent read-locks should be more roboust
if acquring read-lock fails due to an exception such as when using a shared
datbase and there is a duplicate key violation as another node has the lock.
---
...dempotentChangedRepositoryReadLockStrategy.java | 25 ++++++++++++++--------
...IdempotentRenameRepositoryReadLockStrategy.java | 11 ++++++++--
.../FileIdempotentRepositoryReadLockStrategy.java | 17 ++++++++++-----
3 files changed, 37 insertions(+), 16 deletions(-)
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
index 75003fe..09215d0 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentChangedRepositoryReadLockStrategy.java
@@ -86,7 +86,14 @@ public class FileIdempotentChangedRepositoryReadLockStrategy
extends ServiceSupp
// check if we can begin on this file
String key = asKey(file);
- boolean answer = idempotentRepository.add(exchange, key);
+ boolean answer = false;
+ try {
+ answer = idempotentRepository.add(exchange, key);
+ } catch (Exception e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cannot acquire read lock due to " + e.getMessage()
+ ". Will skip the file: " + file, e);
+ }
+ }
if (!answer) {
// another node is processing the file so skip
CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read
lock. Will skip the file: " + file);
@@ -96,7 +103,7 @@ public class FileIdempotentChangedRepositoryReadLockStrategy
extends ServiceSupp
// if we acquired during idempotent then check changed also
answer = changed.acquireExclusiveReadLock(operations, file,
exchange);
if (!answer) {
- // remove from idempontent as we did not acquire it from
changed
+ // remove from idempotent as we did not acquire it from changed
idempotentRepository.remove(exchange, key);
}
}
@@ -119,23 +126,23 @@ public class
FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp
if (removeOnRollback) {
idempotentRepository.remove(exchange, key);
} else {
- // okay we should not remove then confirm it instead
+ // okay we should not remove then confirm instead
idempotentRepository.confirm(exchange, key);
}
try {
changed.releaseExclusiveReadLockOnRollback(operations, file,
exchange);
} catch (Exception e) {
- LOG.warn("Error during releasing exclusive readlock on
rollback. This exception is ignored.", e);
+ LOG.warn("Error during releasing exclusive read lock on
rollback. This exception is ignored.", e);
}
};
if (readLockIdempotentReleaseDelay > 0 &&
readLockIdempotentReleaseExecutorService != null) {
- LOG.debug("Scheduling readlock release task to run asynchronous
delayed after {} millis",
+ LOG.debug("Scheduling read lock release task to run asynchronous
delayed after {} millis",
readLockIdempotentReleaseDelay);
readLockIdempotentReleaseExecutorService.schedule(r,
readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS);
} else if (readLockIdempotentReleaseDelay > 0) {
- LOG.debug("Delaying readlock release task {} millis",
readLockIdempotentReleaseDelay);
+ LOG.debug("Delaying read lock release task {} millis",
readLockIdempotentReleaseDelay);
Thread.sleep(readLockIdempotentReleaseDelay);
r.run();
} else {
@@ -159,16 +166,16 @@ public class
FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupp
try {
changed.releaseExclusiveReadLockOnCommit(operations, file,
exchange);
} catch (Exception e) {
- LOG.warn("Error during releasing exclusive readlock on
rollback. This exception is ignored.", e);
+ LOG.warn("Error during releasing exclusive read lock on
rollback. This exception is ignored.", e);
}
};
if (readLockIdempotentReleaseDelay > 0 &&
readLockIdempotentReleaseExecutorService != null) {
- LOG.debug("Scheduling readlock release task to run asynchronous
delayed after {} millis",
+ LOG.debug("Scheduling read lock release task to run asynchronous
delayed after {} millis",
readLockIdempotentReleaseDelay);
readLockIdempotentReleaseExecutorService.schedule(r,
readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS);
} else if (readLockIdempotentReleaseDelay > 0) {
- LOG.debug("Delaying readlock release task {} millis",
readLockIdempotentReleaseDelay);
+ LOG.debug("Delaying read lock release task {} millis",
readLockIdempotentReleaseDelay);
Thread.sleep(readLockIdempotentReleaseDelay);
r.run();
} else {
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
index 6cf4cfa..82d653d 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRenameRepositoryReadLockStrategy.java
@@ -78,7 +78,14 @@ public class FileIdempotentRenameRepositoryReadLockStrategy
extends ServiceSuppo
// check if we can begin on this file
String key = asKey(file);
- boolean answer = idempotentRepository.add(exchange, key);
+ boolean answer = false;
+ try {
+ answer = idempotentRepository.add(exchange, key);
+ } catch (Exception e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cannot acquire read lock due to " + e.getMessage()
+ ". Will skip the file: " + file, e);
+ }
+ }
if (!answer) {
// another node is processing the file so skip
CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read
lock. Will skip the file: " + file);
@@ -88,7 +95,7 @@ public class FileIdempotentRenameRepositoryReadLockStrategy
extends ServiceSuppo
// if we acquired during idempotent then check rename also
answer = rename.acquireExclusiveReadLock(operations, file,
exchange);
if (!answer) {
- // remove from idempontent as we did not acquire it from
changed
+ // remove from idempotent as we did not acquire it from changed
idempotentRepository.remove(exchange, key);
}
}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
index 5316f55..dacf232 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/strategy/FileIdempotentRepositoryReadLockStrategy.java
@@ -75,7 +75,14 @@ public class FileIdempotentRepositoryReadLockStrategy
extends ServiceSupport
// check if we can begin on this file
String key = asKey(file);
- boolean answer = idempotentRepository.add(exchange, key);
+ boolean answer = false;
+ try {
+ answer = idempotentRepository.add(exchange, key);
+ } catch (Exception e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cannot acquire read lock due to " + e.getMessage()
+ ". Will skip the file: " + file, e);
+ }
+ }
if (!answer) {
// another node is processing the file so skip
CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read
lock. Will skip the file: " + file);
@@ -105,11 +112,11 @@ public class FileIdempotentRepositoryReadLockStrategy
extends ServiceSupport
};
if (readLockIdempotentReleaseDelay > 0 &&
readLockIdempotentReleaseExecutorService != null) {
- LOG.debug("Scheduling readlock release task to run asynchronous
delayed after {} millis",
+ LOG.debug("Scheduling read lock release task to run asynchronous
delayed after {} millis",
readLockIdempotentReleaseDelay);
readLockIdempotentReleaseExecutorService.schedule(r,
readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS);
} else if (readLockIdempotentReleaseDelay > 0) {
- LOG.debug("Delaying readlock release task {} millis",
readLockIdempotentReleaseDelay);
+ LOG.debug("Delaying read lock release task {} millis",
readLockIdempotentReleaseDelay);
Thread.sleep(readLockIdempotentReleaseDelay);
r.run();
} else {
@@ -132,11 +139,11 @@ public class FileIdempotentRepositoryReadLockStrategy
extends ServiceSupport
};
if (readLockIdempotentReleaseDelay > 0 &&
readLockIdempotentReleaseExecutorService != null) {
- LOG.debug("Scheduling readlock release task to run asynchronous
delayed after {} millis",
+ LOG.debug("Scheduling read lock release task to run asynchronous
delayed after {} millis",
readLockIdempotentReleaseDelay);
readLockIdempotentReleaseExecutorService.schedule(r,
readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS);
} else if (readLockIdempotentReleaseDelay > 0) {
- LOG.debug("Delaying readlock release task {} millis",
readLockIdempotentReleaseDelay);
+ LOG.debug("Delaying read lock release task {} millis",
readLockIdempotentReleaseDelay);
Thread.sleep(readLockIdempotentReleaseDelay);
r.run();
} else {