I tried building my own DF and that works (for me)
Jan
package org.apache.camel.dataformat;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import javax.activation.DataHandler;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.ExchangeBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.FileUtil;
import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class SaveHeaderTest extends CamelTestSupport {
private static File dataFile = new File("output/message.ser");
DataFormat customDataFormat = new CustomDataFormat();
@After
@Before
public void cleanup() {
FileUtil.deleteFile(dataFile);
}
@SuppressWarnings("unchecked")
@Test
public void save() throws FileNotFoundException, Exception {
File writtenTo = new File("output/message.ser");
assertFalse(writtenTo.exists());
Exchange exchange = ExchangeBuilder.anExchange(context)
.withBody("Hello World")
.withHeader("from", "Apache Camel")
.withHeader("test", "save")
.withHeader(Exchange.FILE_NAME,
"message.ser")
.build();
template.send("direct:save", exchange);
assertTrue(writtenTo.exists());
// actual
ObjectInputStream in = new ObjectInputStream(new
FileInputStream(dataFile));
Object body = in.readObject();
Map<String,Object> headers = (Map<String, Object>)
in.readObject();
Map<String,DataHandler> attachments = (Map<String,
DataHandler>) in.readObject();
String messageId = (String) in.readObject();
boolean fault = (boolean) in.readObject();
assertEquals("Hello World", body);
assertEquals("Apache Camel", headers.get("from"));
assertEquals("save", headers.get("test"));
assertTrue(attachments.isEmpty());
assertEquals(exchange.getIn().getMessageId(), messageId);
assertFalse(fault);
}
@Test
public void load() throws Exception {
// create test data
ObjectOutputStream out = new ObjectOutputStream(new
FileOutputStream(dataFile));
out.writeObject("Hello World");
Map<String,Object> headers = new HashMap<String, Object>();
headers.put("from", "Apache Camel");
headers.put("test", "load");
out.writeObject(headers);
Map<String,DataHandler> attachments = new
HashMap<String,DataHandler>();
out.writeObject(attachments);
out.writeObject("message-id");
out.writeObject(false);
out.close();
// load the saved 'exchange'
Exchange loadCommand = ExchangeBuilder.anExchange(context)
.withBody(dataFile.getAbsolutePath())
.build();
Exchange response = template.send("direct:load",
loadCommand);
assertEquals("Hello World",
response.getOut().getBody(String.class));
assertEquals("Apache Camel",
response.getOut().getHeader("from"));
assertEquals("load", response.getOut().getHeader("test"));
}
@Test
public void turnaround() {
Exchange exchange = ExchangeBuilder.anExchange(context)
.withBody("Hello World")
.withHeader("from", "Apache Camel")
.build();
Exchange response = template.send("direct:turnaround",
exchange);
assertEquals("Hello World",
response.getOut().getBody(String.class));
assertEquals("Apache Camel",
response.getOut().getHeader("from"));
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:save")
.marshal(customDataFormat)
.to("file:output");
from("direct:load")
.process(openFile())
.unmarshal(customDataFormat);
from("direct:turnaround")
.marshal(customDataFormat)
.unmarshal(customDataFormat);
}
private Processor openFile() {
return new Processor() {
@Override
public void process(Exchange
exchange) throws Exception {
String filename =
exchange.getIn().getBody(String.class);
FileInputStream stream = new
FileInputStream(filename);
exchange.getIn().setBody(stream);
}
};
}
};
}
class CustomDataFormat implements DataFormat {
@Override
public void marshal(Exchange exchange, Object graph,
OutputStream stream)
throws Exception {
ObjectOutputStream out = null;
try {
out = new ObjectOutputStream(stream);
// Save the body
out.writeObject(graph);
// Save the message
Message message = exchange.getIn();
if (message != null) {
// Message is not serializable, so
save the data individually
out.writeObject(message.getHeaders());
out.writeObject(message.getAttachments());
out.writeObject(message.getMessageId());
out.writeObject(message.isFault());
}
} finally {
IOUtils.closeQuietly(out);
}
}
@SuppressWarnings("unchecked")
@Override
public Object unmarshal(Exchange exchange, InputStream
stream)
throws Exception {
ObjectInputStream in = null;
try {
in = new ObjectInputStream(stream);
// read the raw data
Object body = in.readObject();
Map<String,Object> headers = (Map<String,
Object>) in.readObject();
Map<String,DataHandler> attachments =
(Map<String, DataHandler>) in.readObject();
String messageId = (String) in.readObject();
boolean fault = (boolean) in.readObject();
// build the message
Message msg = new DefaultMessage();
msg.setBody(body);
msg.setAttachments(attachments);
msg.setHeaders(headers);
msg.setMessageId(messageId);
msg.setFault(fault);
return msg;
} finally {
IOUtils.closeQuietly(in);
}
}
}
}
> -----Ursprüngliche Nachricht-----
> Von: Jan Matèrne (jhm) [mailto:[email protected]]
> Gesendet: Donnerstag, 19. September 2013 07:06
> An: [email protected]
> Betreff: AW: How to set a header in custom DataFormat?
>
> What I have found is, that the exchange object which is passed to the
> DataFormat is only used for getting the CamelContext.
> I searched a little bit further and found a processor for
> unmarshalling:
>
> org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
> AsyncCallback)
>
> And there is a note
>
> Object result = dataFormat.unmarshal(exchange, stream);
> if (result instanceof Exchange) {
> if (result != exchange) {
> // it's not allowed to return another exchange other than the
> one provided to dataFormat
> throw new RuntimeCamelException("The returned exchange " +
> result + " is not the same as " + exchange + " provided to the
> DataFormat");
>
>
> The logic of that processor is:
> - get the object from the stream
> - if it is an exchange, throw that exception
> - if it is a message, store as 'out' on the exchange-parameter
> - if it is something else, store it as body of the 'out' message
>
>
> So have you tried setting the header on the 'out' message?
>
>
> Jan
>
>
>
> > -----Ursprüngliche Nachricht-----
> > Von: Chris [mailto:[email protected]]
> > Gesendet: Mittwoch, 18. September 2013 22:58
> > An: [email protected]
> > Betreff: How to set a header in custom DataFormat?
> >
> > I implemented a custom DataFormat and in the "unmarshal(Exchange
> > exchange, InputStream stream)" implementation I set a header, but
> upon
> > attempting to retrieve the header downstream from the "unmarshal"
> > call, the header is not there. Since the pattern is inOnly, I added
> > the header to the in Message. I've done similar with custom
> > Processors and in that case, it works. What is different with
> > DataFormat? Is there any way to set header(s) from DataFormat
> marshal/unmarshal?
> >
> > Thanks,
> >
> > Chris