Hi
I’ve been looking at how to use camel-jms to consume xml from a persistent
queue, do some minor transformation/validation of the xml and either output
a single file or route a message to an error queue.
The ultimate goal is to ensure that ALL messages are consumed and either a
file is created ONCE or an error is routed ONCE to an error queue.
As not wishing to use XA transactions by default, I opted to try out an
approach using local JMS transactions, the Camel file component producing
the files and use of an idempotent repository to guard against the
re-processing of redelivered, duplicate messages.
I’d appreciate views on whether it is the case that the approach of using
the Camel file producer alongside native JMS transactions and an
idempotentRepository cannot achieve the desired goal and that to achieve the
goal must I opt to go down the XA route?
My preference was a) not to have to add the additional complexity/overhead
of global transactions unless absolutely necessary and b) to use the Camel
file component as the file producer.
Whilst I can see the original, native JMS transaction based approach working
in part, when considering just the sunny day scenario of successful file
production only the approach does not guarantee that files will only be
produced ONCE. A JVM crash at an inopportune moment between the
JmsTransactionManager transaction commit and file repository record
insertion can lead to skipping file production (the risk of breaking this
guarantee is niche but it does, nonetheless, seem to exist).
My current thinking is the way to achieve the goal is to use XA
transactions alongside an XA compliant file producer and JTA transaction
manager. However, that does seem to preclude the use of the Camel file
component (being non fully transactional – “best efforts” based).
For record, I’m using the Camel File component as the file producer with
local JMS transactions, an externally defined Spring transaction manager,
use of the Camel <transacted/> element within route and use of the
FileIdempotentRepository. The FileIdempotentRepository filters out message
duplicates based on the JmsMessageId (it is configured with eager=true,
using false just shifts problem).
The log output below, highlights the point where a JVM crash will lead to
failure to produce a file but still consume the message.
<code>
05 Apr 2016 09:06:27 JmsTransactionManager DEBUG Created JMS
transaction on Session [Cached JMS Session: ActiveMQSession
{id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=false}
java.lang.Object@44eb7aa] from Connection [Shared JMS Connection:
ActiveMQConnection
{id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1,clientId=ID:ANAME-ABCDEFGH-63012-1459843265976-0:1,started=false}]
05 Apr 2016 09:06:27 TransactionContext DEBUG
Begin:TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4
05 Apr 2016 09:06:27 ultJmsMessageListenerContainer DEBUG Received message
of type [class org.apache.activemq.command.ActiveMQBytesMessage] from
consumer [ActiveMQMessageConsumer {
value=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1:244, started=true }] of
transactional session [Cached JMS Session: ActiveMQSession
{id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=true}
java.lang.Object@44eb7aa]
05 Apr 2016 09:06:27 EndpointMessageListener DEBUG
Endpoint[inputQueue://ANAME.INV.RESP.Q] consumer received JMS message:
ActiveMQBytesMessage ...
05 Apr 2016 09:06:27 TransactionErrorHandler DEBUG Transaction
begin (0x2e900519) redelivered(false) for (MessageId:
ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 on ExchangeId:
ID-ANAME-ABCDEFGH-62996-1459843263591-0-22))
05 Apr 2016 09:06:27 JmsTransactionManager DEBUG Participating in
existing transaction
05 Apr 2016 09:06:27 TransactionErrorHandler TRACE isRunAllowed()
-> true (Run allowed if we are not stopped/stopping)
...
05 Apr 2016 09:06:27 routeUnderTest INFO
ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30: Queue message: <?xml
version="1.0" encoding="UTF-8" standalone="yes"?>
05 Apr 2016 09:06:27 SendProcessor DEBUG >>>>
Endpoint[AcknowledgementProcessor]
Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID:
ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]]
05 Apr 2016 09:06:27 AcknowledgementProcessor DEBUG Payload is ...
05 Apr 2016 09:06:27 AcknowledgementProcessor DEBUG Body is ...
05 Apr 2016 09:06:27 routeUnderTest INFO
queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30:
Processed message: <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
05 Apr 2016 09:06:27 DefaultStreamCachingStrategy DEBUG Should spool
cache 1023 -> false
...
05 Apr 2016 09:06:27 routeUnderTest INFO
queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30:
Validated message: <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
05 Apr 2016 09:06:27 FileIdempotentRepository DEBUG Appending
ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 to idempotent filestore:
C:\Files\filestore\filesOut\.filestore.dat
*** If JVM CRASH occurs here BEFORE JmsTransactionManager commit then
FileIdempotentRepository entry will remain BUT no file has been created and
no FileIdempotentRepository after processing operations will occur. Upon
restoration, broker will redeliver and message will be consumed but
FileIdempotentRepository will filter out duplicate message and hence NO file
will be produced. ***
05 Apr 2016 09:06:27 FilterProcessor DEBUG Filter matches:
false for exchange:
Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID:
ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]]
05 Apr 2016 09:06:27 SendProcessor DEBUG >>>>
Endpoint[file://name/someResponses]
Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID:
ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]]
05 Apr 2016 09:06:27 FileOperations DEBUG Using
InputStream to write file: name\someResponses\BuyDeal.xml
05 Apr 2016 09:06:27 GenericFileProducer DEBUG Wrote
[name\someResponses\BuyDeal.xml] to [Endpoint[file://name/someResponses]]
05 Apr 2016 09:06:27 routeUnderTest INFO
queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30:BuyDeal.xml:
Completed Processing
05 Apr 2016 09:06:27 TransactionErrorHandler TRACE Is exchangeId:
ID-ANAME-ABCDEFGH-62996-1459843263591-0-22 interrupted? false
...
05 Apr 2016 09:06:27 TransactionErrorHandler DEBUG Transaction
commit (0x2e900519) redelivered(false) for (MessageId:
ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 on ExchangeId:
ID-ANAME-ABCDEFGH-62996-1459843263591-0-22))
05 Apr 2016 09:06:27 JmsTransactionManager DEBUG Initiating
transaction commit
05 Apr 2016 09:06:27 JmsTransactionManager DEBUG Committing JMS
transaction on Session [Cached JMS Session: ActiveMQSession
{id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=true}
java.lang.Object@44eb7aa]
05 Apr 2016 09:06:27 ActiveMQSession DEBUG
ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1 Transaction Commit
:TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4
05 Apr 2016 09:06:27 TransactionContext DEBUG Commit:
TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4 syncCount: 2
</code>
Would appreciate views.
Thanks
Glen
--
View this message in context:
http://camel.465427.n5.nabble.com/Guaranteed-file-processing-JMS-and-file-producer-tp5780899.html
Sent from the Camel - Users mailing list archive at Nabble.com.