Claus,
Just thought I would follow up.
I don't know if this counts as a contribution, but I ended up doing this as
a splitter as in:
from(fromUri)
.split().method(OuterZipFileDecompressingSplitter.class,
"split")
.streaming()
.parallelProcessing()
.to("direct:innerZipFile");
from("direct:innerZipFile")
.split().method(InnerZipFileDecompressingSplitter.class,
"split")
.streaming()
.parallelProcessing()
.to("direct:furtherProcessing");
The splitters are:
public class InnerZipFileDecompressingSplitter {
public List<Message> split(InputStream is) throws Exception {
List<Message> messages = new ArrayList<Message>();
ZipInputStream zis = new ZipInputStream(new
BufferedInputStream(is));
try {
Map<String, String> entries =
getUncompressedZipEntries(zis);
for (Map.Entry<String, String> mapEntry :
entries.entrySet()) {
DefaultMessage message = new DefaultMessage();
message.setHeader("Unzipped-File-Name", mapEntry.getKey());
message.setBody(mapEntry.getValue());
messages.add(message);
}
} finally {
zis.close();
}
return messages;
}
private Map<String, String>
getUncompressedZipEntries(ZipInputStream zis) throws Exception {
Map<String, String> entries = new HashMap<String,
String>();
ZipEntry entry;
while ((entry = zis.getNextEntry()) != null) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
IOHelper.copy(zis, bos);
entries.put(entry.getName(), bos.toString());
zis.closeEntry();
}
return entries;
}
}
public class OuterZipFileDecompressingSplitter {
public List<Message> split(InputStream is) throws Exception {
List<Message> messages = new ArrayList<Message>();
ZipInputStream zis = new ZipInputStream(new
BufferedInputStream(is));
try {
ZipEntry entry;
while ((entry = zis.getNextEntry()) != null) {
if (entry.isDirectory())
continue;
DefaultMessage message = new DefaultMessage();
message.setHeader("Inner-Zip-Name", entry.getName());
message.setBody(getZipEntryAsByteArray(zis));
messages.add(message);
zis.closeEntry();
}
} finally {
zis.close();
}
return messages;
}
private byte[] getZipEntryAsByteArray(ZipInputStream zis) throws
Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
IOHelper.copy(zis, bos);
return bos.toByteArray();
}
}
--
View this message in context:
http://camel.465427.n5.nabble.com/FTP-to-HDFS-large-gzipped-files-tp3192431p4429800.html
Sent from the Camel - Users mailing list archive at Nabble.com.