Claus,

Just thought I would follow up.  

I don't know if this counts as a contribution, but I ended up doing this as
a splitter as in:

        from(fromUri)
                .split().method(OuterZipFileDecompressingSplitter.class,
"split")
                .streaming()
                .parallelProcessing()
                .to("direct:innerZipFile");

        from("direct:innerZipFile")
                .split().method(InnerZipFileDecompressingSplitter.class,
"split")
                .streaming()
                .parallelProcessing()
                .to("direct:furtherProcessing");

The splitters are:


public class InnerZipFileDecompressingSplitter {

    public List<Message> split(InputStream is) throws Exception {
        List<Message> messages = new ArrayList<Message>();

        ZipInputStream zis = new ZipInputStream(new
BufferedInputStream(is));

        try {

            Map&lt;String, String&gt; entries =
getUncompressedZipEntries(zis);

            for (Map.Entry&lt;String, String&gt; mapEntry :
entries.entrySet()) {

                DefaultMessage message = new DefaultMessage();

                message.setHeader("Unzipped-File-Name", mapEntry.getKey());
                message.setBody(mapEntry.getValue());

                messages.add(message);
            }

        } finally {
            zis.close();
        }

        return messages;
    }

    private Map&lt;String, String&gt;
getUncompressedZipEntries(ZipInputStream zis) throws Exception {
        Map&lt;String, String&gt; entries = new HashMap&lt;String,
String&gt;();

        ZipEntry entry;

        while ((entry = zis.getNextEntry()) != null) {
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            IOHelper.copy(zis, bos);
            entries.put(entry.getName(), bos.toString());
            zis.closeEntry();
        }

        return entries;
    }

}


public class OuterZipFileDecompressingSplitter {

    public List<Message> split(InputStream is) throws Exception {

        List<Message> messages = new ArrayList<Message>();

        ZipInputStream zis = new ZipInputStream(new
BufferedInputStream(is));

        try {

            ZipEntry entry;

            while ((entry = zis.getNextEntry()) != null) {
                if (entry.isDirectory())
                    continue;

                DefaultMessage message = new DefaultMessage();
                message.setHeader("Inner-Zip-Name", entry.getName());
                message.setBody(getZipEntryAsByteArray(zis));

                messages.add(message);

                zis.closeEntry();
            }

        } finally {
            zis.close();
        }

        return messages;
    }


    private byte[] getZipEntryAsByteArray(ZipInputStream zis) throws
Exception {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();

        IOHelper.copy(zis, bos);

        return bos.toByteArray();
    }

}


--
View this message in context: 
http://camel.465427.n5.nabble.com/FTP-to-HDFS-large-gzipped-files-tp3192431p4429800.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to