This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 1920a0de7e9e CAMEL-22721 Use CachedOutputStream to enable streaming of 
large zip file (#21271)
1920a0de7e9e is described below

commit 1920a0de7e9ed00cf08f0ee1d1154e2fdd9ded5e
Author: lantzen <[email protected]>
AuthorDate: Fri Feb 6 10:54:15 2026 +0100

    CAMEL-22721 Use CachedOutputStream to enable streaming of large zip file 
(#21271)
    
    * CAMEL-22721 Use CachedOutputStream to enable streaming of large zip files
    
    * CAMEL-22721 Updated with feedback from PR
    
    * CAMEL-22721 Updated with feedback from PR
    
    * CAMEL-22721 Formatted code
---
 .../camel/dataformat/zipfile/ZipIterator.java      |  31 +++-
 .../zipfile/ZipFileMultipleFilesSplitterTest.java  |   5 +-
 .../dataformat/zipfile/ZipSplitterRouteTest.java   | 197 ++++++++++++++++++++-
 3 files changed, 215 insertions(+), 18 deletions(-)

diff --git 
a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
 
b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
index 195829583248..45228890dd11 100644
--- 
a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
+++ 
b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
@@ -16,13 +16,20 @@
  */
 package org.apache.camel.dataformat.zipfile;
 
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Objects;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.converter.stream.CachedOutputStream;
 import org.apache.camel.support.DefaultMessage;
 import org.apache.camel.util.IOHelper;
 import org.apache.commons.compress.archivers.ArchiveException;
@@ -44,6 +51,7 @@ public class ZipIterator implements Iterator<Message>, 
Closeable {
     private boolean allowEmptyDirectory;
     private volatile ZipArchiveInputStream zipInputStream;
     private volatile ZipArchiveEntry currentEntry;
+    private volatile List<CachedOutputStream> cachedOutputStreamsToClose = new 
ArrayList<>();
     private volatile Message parent;
     private volatile boolean first;
 
@@ -57,7 +65,7 @@ public class ZipIterator implements Iterator<Message>, 
Closeable {
             zipInputStream = zipArchiveInputStream;
         } else {
             try {
-                ArchiveInputStream input = new 
ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.ZIP,
+                ArchiveInputStream<?> input = new 
ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.ZIP,
                         new BufferedInputStream(inputStream));
                 zipInputStream = (ZipArchiveInputStream) input;
             } catch (ArchiveException e) {
@@ -130,12 +138,13 @@ public class ZipIterator implements Iterator<Message>, 
Closeable {
             currentEntry = getNextEntry();
 
             if (currentEntry != null) {
-                LOG.debug("read zipEntry {}", currentEntry.getName());
+                String zipFileName = currentEntry.getName();
+                LOG.debug("read zipEntry {}", zipFileName);
 
                 Message answer = new DefaultMessage(exchange.getContext());
                 answer.getHeaders().putAll(exchange.getIn().getHeaders());
-                answer.setHeader("zipFileName", currentEntry.getName());
-                answer.setHeader(Exchange.FILE_NAME, currentEntry.getName());
+                answer.setHeader("zipFileName", zipFileName);
+                answer.setHeader(Exchange.FILE_NAME, zipFileName);
                 if (currentEntry.isDirectory()) {
                     if (allowEmptyDirectory) {
                         answer.setBody(new ByteArrayInputStream(new byte[0]));
@@ -143,10 +152,10 @@ public class ZipIterator implements Iterator<Message>, 
Closeable {
                         return getNextElement(); // skip directory
                     }
                 } else {
-                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                    IOHelper.copy(zipInputStream, baos);
-                    byte[] data = baos.toByteArray();
-                    answer.setBody(new ByteArrayInputStream(data));
+                    CachedOutputStream cos = new CachedOutputStream(exchange);
+                    IOHelper.copy(zipInputStream, cos);
+                    answer.setBody(cos.getInputStream());
+                    cachedOutputStreamsToClose.add(cos);
                 }
 
                 return answer;
@@ -192,6 +201,10 @@ public class ZipIterator implements Iterator<Message>, 
Closeable {
         IOHelper.close(zipInputStream);
         zipInputStream = null;
         currentEntry = null;
+
+        for (CachedOutputStream cos : cachedOutputStreamsToClose) {
+            IOHelper.close(cos);
+        }
     }
 
     public boolean isSupportIteratorForEmptyDirectory() {
diff --git 
a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileMultipleFilesSplitterTest.java
 
b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileMultipleFilesSplitterTest.java
index 7a466f5b1ac4..9572636ef3e1 100644
--- 
a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileMultipleFilesSplitterTest.java
+++ 
b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileMultipleFilesSplitterTest.java
@@ -22,12 +22,12 @@ import org.apache.camel.AggregationStrategy;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.Test;
 
-public class ZipFileMultipleFilesSplitterTest extends ZipSplitterRouteTest {
+public class ZipFileMultipleFilesSplitterTest extends CamelTestSupport {
     static final String PROCESSED_FILES_HEADER_NAME = "processedFiles";
 
-    @Override
     @Test
     public void testSplitter() throws InterruptedException {
         MockEndpoint processZipEntry = getMockEndpoint("mock:processZipEntry");
@@ -80,5 +80,4 @@ public class ZipFileMultipleFilesSplitterTest extends 
ZipSplitterRouteTest {
 
         };
     }
-
 }
diff --git 
a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterRouteTest.java
 
b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterRouteTest.java
index 7fff60823e53..fad5cbed8ec4 100644
--- 
a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterRouteTest.java
+++ 
b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterRouteTest.java
@@ -16,36 +16,221 @@
  */
 package org.apache.camel.dataformat.zipfile;
 
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ZipSplitterRouteTest extends CamelTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZipIterator.class);
+
+    @TempDir
+    static File testDirectory;
+
+    private final AtomicInteger memoryMbBefore = new AtomicInteger(0);
+    private final AtomicInteger maxMemoryMbInside = new AtomicInteger(0);
+
+    private static File largeTestFile = null;
+    private static Integer largeTestFileSizeMb = null;
+    private static String expectedLargeTestFileBody = null;
+
+    private File testSpoolDirectory = null;
+    private Set<String> testSpoolDirectoryFileNamesInSplit = new HashSet<>();
 
     @Test
-    public void testSplitter() throws InterruptedException {
+    @Order(1)
+    public void testSplitter() throws InterruptedException, IOException {
+        File srcFile = new 
File("src/test/resources/org/apache/camel/dataformat/zipfile/data/resources.zip");
+        File testFile = new File(testDirectory, "in/" + srcFile.getName());
+
+        FileUtils.copyFile(srcFile, testFile);
+
         MockEndpoint processZipEntry = getMockEndpoint("mock:processZipEntry");
         processZipEntry.expectedBodiesReceivedInAnyOrder("chau", "hi", "hola", 
"another_chiau", "another_hi");
         MockEndpoint.assertIsSatisfied(context);
     }
 
+    /**
+     * Test that ZipSplitter read the whole files in the zip file into memory 
when Spool is Disabled in the Stream
+     * Caching Strategy
+     */
+    @Test
+    @Order(2)
+    public void testSplitterLargeFileWithoutSpoolEnabled() throws 
InterruptedException, IOException {
+        File testFile = new File(testDirectory, "in/large1.zip");
+
+        memoryMbBefore.set(0);
+        maxMemoryMbInside.set(0);
+
+        FileUtils.copyFile(largeTestFile, testFile);
+
+        int diff = testSplitterLargeFile(testFile);
+
+        assertThat("Memory spike NOT detected! " + diff + "MB increased.", 
diff, greaterThan(largeTestFileSizeMb));
+    }
+
+    /**
+     * Test that ZipSplitter doesn't read the whole files in the zip file into 
memory when Spool is Enabled in the
+     * Stream Caching Strategy
+     */
+    @Test
+    @Order(3)
+    public void testSplitterLargeFileWithSpoolEnabled() throws 
InterruptedException, IOException {
+        File testFile = new File(testDirectory, "in/large2.zip");
+        MockEndpoint onCompletion = getMockEndpoint("mock:onCompletion");
+
+        onCompletion.expectedMessageCount(1);
+
+        memoryMbBefore.set(0);
+        maxMemoryMbInside.set(0);
+
+        FileUtils.copyFile(largeTestFile, testFile);
+
+        testSpoolDirectoryFileNamesInSplit.clear();
+
+        int diff = testSplitterLargeFile(testFile);
+
+        assertThat("Memory spike detected! " + diff + "MB increased.", diff, 
lessThan(largeTestFileSizeMb));
+
+        assertThat("No files in SpoolDirectory during split: " + 
testSpoolDirectory.getPath(),
+                testSpoolDirectoryFileNamesInSplit.size(), greaterThan(0));
+        assertEquals(0, testSpoolDirectory.list().length,
+                "Files in SpoolDirectory after test: " + 
testSpoolDirectory.getPath());
+    }
+
+    private int testSplitterLargeFile(File testFile) throws IOException, 
FileNotFoundException, InterruptedException {
+        MockEndpoint processZipEntry = getMockEndpoint("mock:processZipEntry");
+        
processZipEntry.expectedBodiesReceivedInAnyOrder(expectedLargeTestFileBody);
+        MockEndpoint.assertIsSatisfied(context);
+
+        int before = memoryMbBefore.get();
+        int inside = maxMemoryMbInside.get();
+        int diff = inside - before;
+
+        LOG.info("Memory before {}MB, inside {}MB & diff {}MB", before, 
inside, diff);
+
+        return diff;
+    }
+
+    @Override
+    protected void setupResources() {
+        largeTestFile = new File(testDirectory, "large.zip");
+
+        try (OutputStream os = new FileOutputStream(largeTestFile);
+             ZipOutputStream zos = new ZipOutputStream(os)) {
+            zos.putNextEntry(new ZipEntry("test.txt"));
+            byte[] chunk = new byte[5 * 1024];
+            int bytesWritten = 0;
+            Arrays.fill(chunk, (byte) 'A');
+
+            expectedLargeTestFileBody = new String(chunk, 0, 20);
+            for (int i = 1; i <= 1024; i++) {
+                zos.write(chunk);
+                bytesWritten += chunk.length;
+            }
+            zos.closeEntry();
+            zos.flush();
+
+            largeTestFileSizeMb = bytesWritten / 1024 / 1024;
+            LOG.info("Generated large test file with entry of size {}KB", 
bytesWritten / 1024);
+        } catch (IOException e) {
+            throw new RuntimeException("Generate large test file", e);
+        }
+
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        if 
("testSplitterLargeFileWithSpoolEnabled()".equals(contextManagerExtension.getCurrentTestName()))
 {
+            StreamCachingStrategy streamCachingStrategy = 
context.getStreamCachingStrategy();
+            streamCachingStrategy.setSpoolEnabled(true);
+
+            testSpoolDirectory = new File(testDirectory, "spool");
+            testSpoolDirectory.mkdir();
+            streamCachingStrategy.setSpoolDirectory(testSpoolDirectory);
+        }
+
+        return context;
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             @Override
             public void configure() {
                 // Unzip file and Split it according to FileEntry
-                
from("file:src/test/resources/org/apache/camel/dataformat/zipfile/data?delay=1000&noop=true")
-                        .log("Start processing big file: 
${header.CamelFileName}")
-                        .split(new ZipSplitter()).streaming()
+                from("file:"+testDirectory.getPath()+"/in")
+                       .onCompletion()
+                               .to("mock:onCompletion")
+                       .end()
+                    .process(exchange -> captureMemory(memoryMbBefore, 
"BEFORE"))
+                    .log("Start processing big file: ${header.CamelFileName}")
+                    .split(new ZipSplitter()).streaming()
+                    .setBody().message(message -> { // Convert up to 20 bytes 
of body to string
+                        try {
+                            InputStream is = 
message.getBody(InputStream.class);
+                                byte buf[] = new byte[20];
+                                int bytesRead = is.read(buf);
+                                if (20 == bytesRead) { // No need to compare 
memory for all small files i zip file of "testSplitter()"
+                                    captureMemory(maxMemoryMbInside, "INSIDE");
+                                }
+                                return new String(buf, 0, bytesRead);
+                            } catch (IOException e) {
+                                throw new RuntimeException("Failed to convert 
body to String", e);
+                            } finally {
+                                if (null != testSpoolDirectory && 
testSpoolDirectory.exists()) {
+                                    for (String fileName : 
testSpoolDirectory.list()) {
+                                        
testSpoolDirectoryFileNamesInSplit.add(fileName);
+                                    }
+                                }
+                            }
+                        }).to("mock:processZipEntry")
                         .to("log:entry")
-                        .convertBodyTo(String.class).to("mock:processZipEntry")
                         .end()
                         .log("Done processing big file: 
${header.CamelFileName}");
             }
         };
-
     }
 
+    private void captureMemory(AtomicInteger storage, String logPrefix) {
+        try {
+            System.gc();
+            Thread.sleep(100);
+        } catch (InterruptedException e) {
+            // Ignore
+        }
+
+        Runtime runtime = Runtime.getRuntime();
+        long used = (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 
1024);
+
+        storage.set(Math.max((int) used, storage.get()));
+        LOG.info("{}: {}MB", logPrefix, used);
+    }
 }

Reply via email to