Hello,

As a complement the contribution of Romain (who is a collegue of mine, but
in a different team), I would like to submit another component to the Camel
project. This component splits XML inputs with streaming, which, according
to the documentation, is not possible yet. The rule for splitting is an
XPath expression, and the input source can be a GenericFile or an
inputstream.

The code is based on 3 classes, so I put it directly in this message (I just
excluded the JUnit tests here):

public class StaxExpressionBuilder implements Expression {

private static final Logger LOGGER = LoggerFactory
.getLogger(StaxExpressionBuilder.class);

/** The XPath value that inputstream elements must match to be splitted. */
private final String path;

public StaxExpressionBuilder(String path) {
this.path = path;
}

@SuppressWarnings("unchecked")
@Override
public <T> T evaluate(Exchange exchange, Class<T> type) {
try {
Endpoint fromEndpoint = exchange.getFromEndpoint();
fromEndpoint.getEndpointKey();
Object body = exchange.getIn().getBody();
InputStream inputStream = null;
if (body instanceof GenericFile) {
GenericFile<File> file = (GenericFile<File>) body;
inputStream = new FileInputStream(file.getFile());
}
if (inputStream != null) {
return (T) new StaxIterator(inputStream, path);
}
LOGGER.error("No inputstream for message body of type "
+ body.getClass().getCanonicalName());
} catch (FileNotFoundException e) {
LOGGER.error("Failed to read incoming file", e);
} catch (XMLStreamException e) {
LOGGER.error(
"Failed to create STaX iterator on incoming file message",
e);
}
return null;
}
}

--------------------------------
public class StaxIterator implements Iterator<String> {

private final AtomicInteger counter = new AtomicInteger(0);
private static final Logger LOGGER = LoggerFactory
.getLogger(StaxIterator.class);

private final XMLEventReader eventReader;
private final XPathLocation currentLocation = new XPathLocation();
private final List<String> matchPathes;
private final XMLInputFactory inputFactory = XMLInputFactory.newInstance();

private String nextItem = null;

public StaxIterator(InputStream inputStream, String pathes)
throws XMLStreamException {
this.matchPathes = new ArrayList<String>();
for (String path : pathes.split("\\|")) {
this.matchPathes.add(path.trim());
}
this.eventReader = inputFactory.createXMLEventReader(inputStream);
this.nextItem = readNextItem();
}

@Override
public boolean hasNext() {
return (nextItem != null);
}

@Override
public String next() {
String currentItem = this.nextItem;
this.nextItem = readNextItem();
return currentItem;
}

private String readNextItem() {
try {
StringBuilder itemBuilder = null;
boolean found = false;
String item = null;
while (eventReader.hasNext() && !found) {
XMLEvent event = eventReader.nextEvent();
if (event.isStartElement()) {
StartElement element = event.asStartElement();
String localName = element.getName().getLocalPart();
currentLocation.appendSegment(localName);
if (currentLocation.matches(matchPathes)) {
itemBuilder = new StringBuilder();
}
startRecording(itemBuilder, element);
} else if (event.isCharacters()) {
record(itemBuilder, event.asCharacters());
} else if (event.isEndElement()) {
// If we reach the end of an item element we stop recording.
endRecordingElement(itemBuilder, event.asEndElement());
if (currentLocation.matches(matchPathes)) {
found = true;
item = itemBuilder.toString();
counter.incrementAndGet();
}
currentLocation.removeLastSegment();
}
}
return item;
} catch (XMLStreamException e) {
LOGGER.error("Failed to read item #" + counter.get()
+ " from inputstream", e);
return null;
}
}

private void endRecordingElement(StringBuilder itemBuilder,
EndElement endElement) {
if (itemBuilder == null) {
return;
}
itemBuilder.append("</").append(endElement.getName().getLocalPart())
.append(">");
}

private void record(StringBuilder itemBuilder, Characters characters) {
if (itemBuilder == null) {
return;
}
itemBuilder.append(characters.getData());
}

private void startRecording(StringBuilder itemBuilder, StartElement element)
{
if (itemBuilder == null) {
return;
}
itemBuilder.append("<").append(element.getName().getLocalPart());
@SuppressWarnings("unchecked")
Iterator<Attribute> attributes = element.getAttributes();
while (attributes.hasNext()) {
Attribute attr = attributes.next();
itemBuilder.append(" ").append(attr.getName()).append("=\"")
.append(attr.getValue()).append("\"");
}
itemBuilder.append(">");
}

@Override
public void remove() {
throw new UnsupportedOperationException(
"remove() method is not supported by this Iterator, in the context of StAX
input reading only.");
}
}

--------------------------------
public class XPathLocation {

private static final String NODE_SEPARATOR = "/";

private static final String DOUBLE_NODE_SEPARATOR = "//";

/** location with initial value. */
private String location = NODE_SEPARATOR;

/**
 * Constructor
 */
public XPathLocation() {
super();
}

/**
 * Full Constructor.
 *
 * @param value
 *            initial value
 */
public XPathLocation(String value) {
super();
this.location = value;
}

public String getLocation() {
return location;
}

public String appendSegment(String segment) {
location = new StringBuilder(location).append(NODE_SEPARATOR)
.append(segment).toString();
location = location.replaceAll("//", "/");
return location;
}

public String removeLastSegment() {
location = StringUtils.substringBeforeLast(location, NODE_SEPARATOR);
if (location.isEmpty()) {
location = NODE_SEPARATOR;
}
return location;
}

/**
 * Returns true if one of the given pattern matches the current location,
 * false otherwise
 *
 * @param orPatterns
 *            the given patterns
 * @return true or false
 */
public boolean matches(final List<String> orPatterns) {
for (String pattern : orPatterns) {
if (matches(pattern)) {
return true;
}
}
return false;
}

/**
 * Returns true if the given pattern matches the current location, false
 * otherwise
 *
 * @param pattern
 *            the given pattern
 * @return true or false
 */
public boolean matches(final String pattern) {
if (pattern == null || pattern.isEmpty()) {
return false;
} else if (pattern.startsWith(NODE_SEPARATOR)) {
return matchStartWith(pattern);
} else if (pattern.contains(DOUBLE_NODE_SEPARATOR)) {
return matchContains(pattern);
} else {
String lastSegments = StringUtils.substringAfterLast(location,
pattern + NODE_SEPARATOR);
return (!lastSegments.isEmpty()) && location.endsWith(lastSegments)
&& !lastSegments.contains(NODE_SEPARATOR);
}
}

private boolean matchContains(String pattern) {
String firstSegments = StringUtils.substringBefore(pattern,
DOUBLE_NODE_SEPARATOR) + NODE_SEPARATOR;
String lastSegments = NODE_SEPARATOR
+ StringUtils.substringAfter(pattern, DOUBLE_NODE_SEPARATOR);

return location.contains(firstSegments)
&& location.endsWith(lastSegments)
&& location.indexOf(lastSegments, firstSegments.length()) >= (location
.indexOf(firstSegments) + firstSegments.length() - NODE_SEPARATOR
.length());
}

private boolean matchStartWith(String pattern) {
if (pattern.startsWith(DOUBLE_NODE_SEPARATOR)) {
return location.endsWith(StringUtils.substringAfter(pattern,
DOUBLE_NODE_SEPARATOR));
} else {
return pattern.equals(location);
}
}
}
--------------------------------

In the code, here is how he use it:

public class MyRouteBuilder
extends RouteBuilder {

        @Override
public void configure() {
             from(file:..).*split(stax("//foo/bar")).streaming()*.to(...);
        }

        private Expression stax(String path) {
return new StaxExpressionBuilder(path);
}
}

Here's how it works :
- when splitting the incoming message body, the stax() method returns a new
type of Iterator.
- when streaming, the iterator's next() method is called. Using StAX inside,
it moves into the inputstream and keeps track of the element locations it
traverses.
- when an element's location matches the given XPathLocation, the iterator
'records' the inputstream content and returns it at the end of the element.

Note that the stax() method is part of my RouteBuilder, but it could be
moved to the RouteBuilder super class for a generic usage.


What do you think about it ?
Is this something you're interested in ?

Best regards,
Xavier

On Fri, May 13, 2011 at 8:21 AM, Romain Manni-Bucau
<[email protected]>wrote:

> Hi,
>
> thank you Richard and Claus for your feedbacks.
>
> I modified the classloading stuff, the NPE catch and added the XMLUtil
> class
> to get the tag name.
>
> I added support for input stream as input (adding some converters) but the
> problem is that camel already have a lot of converters and you can load
> back
> the whole file very fast if you don't take care.
>
> - Romain
>
> 2011/5/13 Claus Ibsen <[email protected]>
>
> > Hi
> >
> > Yeah it does look very cool. Good work.
> >
> > Would be great if the StaxComponent could also cater for non file
> > based inputs. You may have the message body as a Source already. But
> > that can always be improved.
> >
> > And yes as Richard mention the class loading should use the
> > ClassResolver. You can get it from the CamelContext. exchange -> camel
> > context -> class resolver.
> >
> > And the stuff that finds the annotations. We may have some common code
> > for that. Or later refactor that into a util class.
> >
> > Anyway keep it up.
> >
> >
> > On Fri, May 13, 2011 at 1:29 AM, Richard Kettelerij
> > <[email protected]> wrote:
> > > Hi Romain,
> > >
> > > Nice work. I've taken a look at your component. A few minor suggestions
> > for
> > > improvement, in case you want to contribute it to Apache:
> > >
> > > - The component currently uses getContextClassLoader().loadClass() for
> > > classloading. Camel actually has a abstraction to make this portable
> > across
> > > various runtime environments. You can just replace it with
> > > org.apache.camel.spi.ClassResolver().resolveClass().
> > >
> > > - Avoid catching the NullPointException in the
> > StAXJAXBIteratorExpression.
> > >
> > > - Do you plan to add a DSL method for the StAXJAXBIteratorExpression
> > > (requires patching camel-core)? So you can write for example
> > > "split(stax(Record.class))" in your route.
> > >
> > > Regards,
> > > Richard
> > >
> > > On Thu, May 12, 2011 at 5:55 PM, Romain Manni-Bucau
> > > <[email protected]>wrote:
> > >
> > >> Hi all,
> > >>
> > >> i worked a bit around stax (thanks to claus for its advices).
> > >>
> > >> You can find what i've done here:
> > >> http://code.google.com/p/rmannibucau/source/browse/camel/camel-stax/
> > >>
> > >> The test show what can be done with it:
> > >>
> > >>
> >
> http://code.google.com/p/rmannibucau/source/browse/camel/camel-stax/src/test/java/org/apache/camel/stax/test/StAXRouteTest.java
> > >>
> > >>   - validation using sax (just need a converter)
> > >>   - parsing using a sax contenthandler and a stax stream reader (a
> > simple
> > >>   component)
> > >>   - parsing of sub tree to get jaxb objects using a stax event reader
> > for
> > >>   the whole tree and jaxb for the sub objects
> > >>
> > >>
> > >> - Romain
> > >>
> > >
> >
> >
> >
> > --
> > Claus Ibsen
> > -----------------
> > FuseSource
> > Email: [email protected]
> > Web: http://fusesource.com
> > CamelOne 2011: http://fusesource.com/camelone2011/
> > Twitter: davsclaus
> > Blog: http://davsclaus.blogspot.com/
> > Author of Camel in Action: http://www.manning.com/ibsen/
> >
>



-- 
Xavier

Reply via email to