I wrote small code by copying the logic of Loop and defined my own 'While'. As
I have no experience what-so-ever in camel contribution or knowledge about
whether extension of Loop is proposed solution or writing existing one, I was
asking if there was any JIRA item.
There are two new classes (They can be named better -- LoopToDefinition may
be??):
- WhileDefinition
- WhileProcessor
And add two methods to -- ProcessorDefinition
- 'public void while(Expression)'
- 'public ExpressionClause<WhileDefinition> while()'
The only question about 'do-while' and 'while' can may be decided either by
header or additional property.
Regards,
Arpit.
Below are the two classes :)
///////////////////////////////
// BEGIN OF WhileProcessor //
///////////////////////////////
public class WhileProcessor extends DelegateAsyncProcessor implements Traceable
{
private static final Logger LOG =
LoggerFactory.getLogger(LoopProcessor.class);
private final Expression expression;
private final boolean copy;
public WhileProcessor(Processor processor, Expression expression, boolean
copy) {
super(processor);
this.expression = expression;
this.copy = copy;
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
// use atomic integer to be able to pass reference and keep track on
the values
AtomicInteger index = new AtomicInteger();
AtomicBoolean atomicExpressionEvaluation = new AtomicBoolean();
try {
evaluateExpression(exchange, atomicExpressionEvaluation);
} catch (NoTypeConversionAvailableException e) {
exchange.setException(e);
callback.done(true);
return true;
}
// we hold on to the original Exchange in case it's needed for copies
final Exchange original = exchange;
// per-iteration exchange
Exchange target = exchange;
// set the size before we start
exchange.setProperty(Exchange.LOOP_SIZE, index);
// loop synchronously
while (atomicExpressionEvaluation.get()) {
// and prepare for next iteration
// if (!copy) target = exchange; else copy of original
target = prepareExchange(exchange, index.get(), original);
boolean sync = process(target, callback, index,
atomicExpressionEvaluation, original);
if (!sync) {
LOG.trace("Processing exchangeId: {} is continued being
processed asynchronously", target.getExchangeId());
// the remainder of the routing slip will be completed async
// so we break out now, then the callback will be invoked which
then continue routing from where we left here
return false;
}
LOG.trace("Processing exchangeId: {} is continued being processed
synchronously", target.getExchangeId());
// increment counter before next loop
index.getAndIncrement();
try {
evaluateExpression(exchange, atomicExpressionEvaluation);
} catch (NoTypeConversionAvailableException e) {
exchange.setException(e);
callback.done(true);
return true;
}
}
// we are done so prepare the result
ExchangeHelper.copyResults(exchange, target);
LOG.trace("Processing complete for exchangeId: {} >>> {}",
exchange.getExchangeId(), exchange);
callback.done(true);
return true;
}
protected boolean process(final Exchange exchange, final AsyncCallback
callback,
final AtomicInteger index, final AtomicBoolean
atomicExpressionEvaluation,
final Exchange original) {
// set current index as property
LOG.debug("LoopProcessor: iteration #{}", index.get());
exchange.setProperty(Exchange.LOOP_INDEX, index.get());
boolean sync = processor.process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
// we only have to handle async completion of the routing slip
if (doneSync) {
return;
}
Exchange target = exchange;
// increment index as we have just processed once
index.getAndIncrement();
// continue looping asynchronously
while (atomicExpressionEvaluation.get()) {
// and prepare for next iteration
target = prepareExchange(exchange, index.get(), original);
// process again
boolean sync = process(target, callback, index,
atomicExpressionEvaluation, original);
if (!sync) {
LOG.trace("Processing exchangeId: {} is continued being
processed asynchronously", target.getExchangeId());
// the remainder of the routing slip will be completed
async
// so we break out now, then the callback will be
invoked which then continue routing from where we left here
return;
}
// increment counter before next loop
index.getAndIncrement();
try {
evaluateExpression(exchange,
atomicExpressionEvaluation);
} catch (NoTypeConversionAvailableException e) {
exchange.setException(e);
callback.done(true);
return;
}
}
// we are done so prepare the result
ExchangeHelper.copyResults(exchange, target);
LOG.trace("Processing complete for exchangeId: {} >>> {}",
exchange.getExchangeId(), exchange);
callback.done(false);
}
});
return sync;
}
/**
* Prepares the exchange for the next iteration
*
* @param exchange the exchange
* @param index the index of the next iteration
* @return the exchange to use
*/
protected Exchange prepareExchange(Exchange exchange, int index, Exchange
original) {
if (copy) {
// use a copy but let it reuse the same exchange id so it appear as
one exchange
// use the original exchange rather than the looping exchange (esp.
with the async routing engine)
return ExchangeHelper.createCopy(original, true);
} else {
ExchangeHelper.prepareOutToIn(exchange);
return exchange;
}
}
public Expression getExpression() {
return expression;
}
public String getTraceLabel() {
return "loop[" + expression + "]";
}
@Override
public String toString() {
return "Loop[for: " + expression + " times do: " + getProcessor() + "]";
}
private void evaluateExpression(Exchange exchange, AtomicBoolean
atomicExpressionEvaluation) throws NoTypeConversionAvailableException {
// Intermediate conversion to String is needed when direct
conversion to Boolean is not available
// but evaluation result is a textual representation of a
numeric value.
String expressionEvaluation = expression.evaluate(exchange,
String.class);
boolean result =
ExchangeHelper.convertToMandatoryType(exchange, Boolean.class,
expressionEvaluation);
atomicExpressionEvaluation.set(result);
}
}
///////////////////////////////
// BEGIN OF WhileDefinition //
///////////////////////////////
@XmlRootElement(name = "while")
@XmlAccessorType(XmlAccessType.FIELD)
public class WhileDefinition extends ExpressionNode {
@XmlAttribute
private Boolean copy;
public WhileDefinition() {
}
public WhileDefinition(Expression expression) {
super(expression);
}
public WhileDefinition(ExpressionDefinition expression) {
super(expression);
}
/**
* Enables copy mode so a copy of the input Exchange is used for each
iteration.
* That means each iteration will start from a copy of the same message.
* <p/>
* By default loop will loop the same exchange all over, so each iteration
may
* have different message content.
*
* @return the builder
*/
public WhileDefinition copy() {
setCopy(true);
return this;
}
public void setExpression(Expression expr) {
setExpression(ExpressionNodeHelper.toExpressionDefinition(expr));
}
public Boolean getCopy() {
return copy;
}
public void setCopy(Boolean copy) {
this.copy = copy;
}
public boolean isCopy() {
// do not copy by default to be backwards compatible
return copy != null ? copy : false;
}
@Override
public String toString() {
return "While[" + getExpression() + " -> " + getOutputs() + "]";
}
@Override
public String getLabel() {
return "while[" + getExpression() + "]";
}
@Override
public String getShortName() {
return "while";
}
@Override
public Processor createProcessor(RouteContext routeContext) throws
Exception {
Processor output = this.createChildProcessor(routeContext,
true);
return new WhileProcessor(output,
getExpression().createExpression(routeContext), isCopy());
}
}
-----Original Message-----
From: Dale King [mailto:[email protected]]
Sent: Monday, September 30, 2013 10:01 PM
To: [email protected]
Subject: Re: Extend Camel Loop to support While Behavior
I thought for sure I read the Jira issue for adding a while loop since I
too had to figure out that work around, but I'll be darned if I can find
the Jira issue now.
I found a post from last year where Claus said, "There is a JIRA to make
the loop like a while loop so we can use a predicate to know if we should
continue looping or not. So someday in a future Camel release you can do
it." Perhaps that is what I am thinking of, but I cannot find the Jira that
Claus is referring to.
The concept for adding it is very simple. The one detail to work out is you
need to support while and do-while (whether the predicate is checked before
the first iteration or not).
On Mon, Sep 30, 2013 at 10:43 AM, Goyal, Arpit <[email protected]> wrote:
> Hi Dale,
>
> What you mean by use distribution list to set header to NULL? Can you
> share the example (Spring DSL if possible)
> Do you mean that we should change the value of Headers
> "LOOP_INDEX" & "LOOP_SIZE"?
>
> Is this the JIRA Item for the same? -->
> https://issues.apache.org/jira/browse/CAMEL-4564?jql=text%20~%20%22Loop%20EIP%22
> This propose to create new DSL element (loopTo)?
>
> Regards,
> Arpit.
>
> -----Original Message-----
> From: Dale King [mailto:[email protected]]
> Sent: Monday, September 30, 2013 7:24 PM
> To: [email protected]
> Subject: Re: Extend Camel Loop to support While Behavior
>
> There is a JIRA from a year or 2 ago that has no progress. This would be
> trivial to add.
>
> The work around is to use distribution list on a header and when you want
> to exit set that header to null.
>
> On Sep 30, 2013, at 2:28 AM, Claus Ibsen <[email protected]> wrote:
>
> > Hi
> >
> > Yeah we have talked about this in the past. Not sure if there is a
> > JIRA. You are welcome to look and if you cant find a JIRA then log a
> > new JIRA for this.
> >
> >
> >
> > On Mon, Sep 30, 2013 at 6:01 AM, Goyal, Arpit <[email protected]>
> wrote:
> >> Hi Claus,
> >>
> >> Is there a plan to support Camel Loop to support While behavior? Is
> there a jira issue for the same?
> >>
> >> Regards,
> >> Arpit.
> >
> >
> >
> > --
> > Claus Ibsen
> > -----------------
> > Red Hat, Inc.
> > Email: [email protected]
> > Twitter: davsclaus
> > Blog: http://davsclaus.com
> > Author of Camel in Action: http://www.manning.com/ibsen
>
>
--
Dale King