This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 1920a0de7e9e CAMEL-22721 Use CachedOutputStream to enable streaming of
large zip file (#21271)
1920a0de7e9e is described below
commit 1920a0de7e9ed00cf08f0ee1d1154e2fdd9ded5e
Author: lantzen <[email protected]>
AuthorDate: Fri Feb 6 10:54:15 2026 +0100
CAMEL-22721 Use CachedOutputStream to enable streaming of large zip file
(#21271)
* CAMEL-22721 Use CachedOutputStream to enable streaming of large zip files
* CAMEL-22721 Updated with feedback from PR
* CAMEL-22721 Updated with feedback from PR
* CAMEL-22721 Formatted code
---
.../camel/dataformat/zipfile/ZipIterator.java | 31 +++-
.../zipfile/ZipFileMultipleFilesSplitterTest.java | 5 +-
.../dataformat/zipfile/ZipSplitterRouteTest.java | 197 ++++++++++++++++++++-
3 files changed, 215 insertions(+), 18 deletions(-)
diff --git
a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
index 195829583248..45228890dd11 100644
---
a/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
+++
b/components/camel-zipfile/src/main/java/org/apache/camel/dataformat/zipfile/ZipIterator.java
@@ -16,13 +16,20 @@
*/
package org.apache.camel.dataformat.zipfile;
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Objects;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.converter.stream.CachedOutputStream;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.util.IOHelper;
import org.apache.commons.compress.archivers.ArchiveException;
@@ -44,6 +51,7 @@ public class ZipIterator implements Iterator<Message>,
Closeable {
private boolean allowEmptyDirectory;
private volatile ZipArchiveInputStream zipInputStream;
private volatile ZipArchiveEntry currentEntry;
+ private volatile List<CachedOutputStream> cachedOutputStreamsToClose = new
ArrayList<>();
private volatile Message parent;
private volatile boolean first;
@@ -57,7 +65,7 @@ public class ZipIterator implements Iterator<Message>,
Closeable {
zipInputStream = zipArchiveInputStream;
} else {
try {
- ArchiveInputStream input = new
ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.ZIP,
+ ArchiveInputStream<?> input = new
ArchiveStreamFactory().createArchiveInputStream(ArchiveStreamFactory.ZIP,
new BufferedInputStream(inputStream));
zipInputStream = (ZipArchiveInputStream) input;
} catch (ArchiveException e) {
@@ -130,12 +138,13 @@ public class ZipIterator implements Iterator<Message>,
Closeable {
currentEntry = getNextEntry();
if (currentEntry != null) {
- LOG.debug("read zipEntry {}", currentEntry.getName());
+ String zipFileName = currentEntry.getName();
+ LOG.debug("read zipEntry {}", zipFileName);
Message answer = new DefaultMessage(exchange.getContext());
answer.getHeaders().putAll(exchange.getIn().getHeaders());
- answer.setHeader("zipFileName", currentEntry.getName());
- answer.setHeader(Exchange.FILE_NAME, currentEntry.getName());
+ answer.setHeader("zipFileName", zipFileName);
+ answer.setHeader(Exchange.FILE_NAME, zipFileName);
if (currentEntry.isDirectory()) {
if (allowEmptyDirectory) {
answer.setBody(new ByteArrayInputStream(new byte[0]));
@@ -143,10 +152,10 @@ public class ZipIterator implements Iterator<Message>,
Closeable {
return getNextElement(); // skip directory
}
} else {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- IOHelper.copy(zipInputStream, baos);
- byte[] data = baos.toByteArray();
- answer.setBody(new ByteArrayInputStream(data));
+ CachedOutputStream cos = new CachedOutputStream(exchange);
+ IOHelper.copy(zipInputStream, cos);
+ answer.setBody(cos.getInputStream());
+ cachedOutputStreamsToClose.add(cos);
}
return answer;
@@ -192,6 +201,10 @@ public class ZipIterator implements Iterator<Message>,
Closeable {
IOHelper.close(zipInputStream);
zipInputStream = null;
currentEntry = null;
+
+ for (CachedOutputStream cos : cachedOutputStreamsToClose) {
+ IOHelper.close(cos);
+ }
}
public boolean isSupportIteratorForEmptyDirectory() {
diff --git
a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileMultipleFilesSplitterTest.java
b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileMultipleFilesSplitterTest.java
index 7a466f5b1ac4..9572636ef3e1 100644
---
a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileMultipleFilesSplitterTest.java
+++
b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipFileMultipleFilesSplitterTest.java
@@ -22,12 +22,12 @@ import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;
-public class ZipFileMultipleFilesSplitterTest extends ZipSplitterRouteTest {
+public class ZipFileMultipleFilesSplitterTest extends CamelTestSupport {
static final String PROCESSED_FILES_HEADER_NAME = "processedFiles";
- @Override
@Test
public void testSplitter() throws InterruptedException {
MockEndpoint processZipEntry = getMockEndpoint("mock:processZipEntry");
@@ -80,5 +80,4 @@ public class ZipFileMultipleFilesSplitterTest extends
ZipSplitterRouteTest {
};
}
-
}
diff --git
a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterRouteTest.java
b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterRouteTest.java
index 7fff60823e53..fad5cbed8ec4 100644
---
a/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterRouteTest.java
+++
b/components/camel-zipfile/src/test/java/org/apache/camel/dataformat/zipfile/ZipSplitterRouteTest.java
@@ -16,36 +16,221 @@
*/
package org.apache.camel.dataformat.zipfile;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class ZipSplitterRouteTest extends CamelTestSupport {
+ private static final Logger LOG =
LoggerFactory.getLogger(ZipIterator.class);
+
+ @TempDir
+ static File testDirectory;
+
+ private final AtomicInteger memoryMbBefore = new AtomicInteger(0);
+ private final AtomicInteger maxMemoryMbInside = new AtomicInteger(0);
+
+ private static File largeTestFile = null;
+ private static Integer largeTestFileSizeMb = null;
+ private static String expectedLargeTestFileBody = null;
+
+ private File testSpoolDirectory = null;
+ private Set<String> testSpoolDirectoryFileNamesInSplit = new HashSet<>();
@Test
- public void testSplitter() throws InterruptedException {
+ @Order(1)
+ public void testSplitter() throws InterruptedException, IOException {
+ File srcFile = new
File("src/test/resources/org/apache/camel/dataformat/zipfile/data/resources.zip");
+ File testFile = new File(testDirectory, "in/" + srcFile.getName());
+
+ FileUtils.copyFile(srcFile, testFile);
+
MockEndpoint processZipEntry = getMockEndpoint("mock:processZipEntry");
processZipEntry.expectedBodiesReceivedInAnyOrder("chau", "hi", "hola",
"another_chiau", "another_hi");
MockEndpoint.assertIsSatisfied(context);
}
+ /**
+ * Test that ZipSplitter read the whole files in the zip file into memory
when Spool is Disabled in the Stream
+ * Caching Strategy
+ */
+ @Test
+ @Order(2)
+ public void testSplitterLargeFileWithoutSpoolEnabled() throws
InterruptedException, IOException {
+ File testFile = new File(testDirectory, "in/large1.zip");
+
+ memoryMbBefore.set(0);
+ maxMemoryMbInside.set(0);
+
+ FileUtils.copyFile(largeTestFile, testFile);
+
+ int diff = testSplitterLargeFile(testFile);
+
+ assertThat("Memory spike NOT detected! " + diff + "MB increased.",
diff, greaterThan(largeTestFileSizeMb));
+ }
+
+ /**
+ * Test that ZipSplitter doesn't read the whole files in the zip file into
memory when Spool is Enabled in the
+ * Stream Caching Strategy
+ */
+ @Test
+ @Order(3)
+ public void testSplitterLargeFileWithSpoolEnabled() throws
InterruptedException, IOException {
+ File testFile = new File(testDirectory, "in/large2.zip");
+ MockEndpoint onCompletion = getMockEndpoint("mock:onCompletion");
+
+ onCompletion.expectedMessageCount(1);
+
+ memoryMbBefore.set(0);
+ maxMemoryMbInside.set(0);
+
+ FileUtils.copyFile(largeTestFile, testFile);
+
+ testSpoolDirectoryFileNamesInSplit.clear();
+
+ int diff = testSplitterLargeFile(testFile);
+
+ assertThat("Memory spike detected! " + diff + "MB increased.", diff,
lessThan(largeTestFileSizeMb));
+
+ assertThat("No files in SpoolDirectory during split: " +
testSpoolDirectory.getPath(),
+ testSpoolDirectoryFileNamesInSplit.size(), greaterThan(0));
+ assertEquals(0, testSpoolDirectory.list().length,
+ "Files in SpoolDirectory after test: " +
testSpoolDirectory.getPath());
+ }
+
+ private int testSplitterLargeFile(File testFile) throws IOException,
FileNotFoundException, InterruptedException {
+ MockEndpoint processZipEntry = getMockEndpoint("mock:processZipEntry");
+
processZipEntry.expectedBodiesReceivedInAnyOrder(expectedLargeTestFileBody);
+ MockEndpoint.assertIsSatisfied(context);
+
+ int before = memoryMbBefore.get();
+ int inside = maxMemoryMbInside.get();
+ int diff = inside - before;
+
+ LOG.info("Memory before {}MB, inside {}MB & diff {}MB", before,
inside, diff);
+
+ return diff;
+ }
+
+ @Override
+ protected void setupResources() {
+ largeTestFile = new File(testDirectory, "large.zip");
+
+ try (OutputStream os = new FileOutputStream(largeTestFile);
+ ZipOutputStream zos = new ZipOutputStream(os)) {
+ zos.putNextEntry(new ZipEntry("test.txt"));
+ byte[] chunk = new byte[5 * 1024];
+ int bytesWritten = 0;
+ Arrays.fill(chunk, (byte) 'A');
+
+ expectedLargeTestFileBody = new String(chunk, 0, 20);
+ for (int i = 1; i <= 1024; i++) {
+ zos.write(chunk);
+ bytesWritten += chunk.length;
+ }
+ zos.closeEntry();
+ zos.flush();
+
+ largeTestFileSizeMb = bytesWritten / 1024 / 1024;
+ LOG.info("Generated large test file with entry of size {}KB",
bytesWritten / 1024);
+ } catch (IOException e) {
+ throw new RuntimeException("Generate large test file", e);
+ }
+
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext context = super.createCamelContext();
+
+ if
("testSplitterLargeFileWithSpoolEnabled()".equals(contextManagerExtension.getCurrentTestName()))
{
+ StreamCachingStrategy streamCachingStrategy =
context.getStreamCachingStrategy();
+ streamCachingStrategy.setSpoolEnabled(true);
+
+ testSpoolDirectory = new File(testDirectory, "spool");
+ testSpoolDirectory.mkdir();
+ streamCachingStrategy.setSpoolDirectory(testSpoolDirectory);
+ }
+
+ return context;
+ }
+
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() {
// Unzip file and Split it according to FileEntry
-
from("file:src/test/resources/org/apache/camel/dataformat/zipfile/data?delay=1000&noop=true")
- .log("Start processing big file:
${header.CamelFileName}")
- .split(new ZipSplitter()).streaming()
+ from("file:"+testDirectory.getPath()+"/in")
+ .onCompletion()
+ .to("mock:onCompletion")
+ .end()
+ .process(exchange -> captureMemory(memoryMbBefore,
"BEFORE"))
+ .log("Start processing big file: ${header.CamelFileName}")
+ .split(new ZipSplitter()).streaming()
+ .setBody().message(message -> { // Convert up to 20 bytes
of body to string
+ try {
+ InputStream is =
message.getBody(InputStream.class);
+ byte buf[] = new byte[20];
+ int bytesRead = is.read(buf);
+ if (20 == bytesRead) { // No need to compare
memory for all small files i zip file of "testSplitter()"
+ captureMemory(maxMemoryMbInside, "INSIDE");
+ }
+ return new String(buf, 0, bytesRead);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to convert
body to String", e);
+ } finally {
+ if (null != testSpoolDirectory &&
testSpoolDirectory.exists()) {
+ for (String fileName :
testSpoolDirectory.list()) {
+
testSpoolDirectoryFileNamesInSplit.add(fileName);
+ }
+ }
+ }
+ }).to("mock:processZipEntry")
.to("log:entry")
- .convertBodyTo(String.class).to("mock:processZipEntry")
.end()
.log("Done processing big file:
${header.CamelFileName}");
}
};
-
}
+ private void captureMemory(AtomicInteger storage, String logPrefix) {
+ try {
+ System.gc();
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+
+ Runtime runtime = Runtime.getRuntime();
+ long used = (runtime.totalMemory() - runtime.freeMemory()) / (1024 *
1024);
+
+ storage.set(Math.max((int) used, storage.get()));
+ LOG.info("{}: {}MB", logPrefix, used);
+ }
}