This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.4.x by this push:
new 4f1bfd6 CAMEL-15557: Stream Cache file not deleted (#4261)
4f1bfd6 is described below
commit 4f1bfd6e59487488517ea70d5c26d1342324eea1
Author: forsthofer <[email protected]>
AuthorDate: Mon Sep 21 06:38:15 2020 +0200
CAMEL-15557: Stream Cache file not deleted (#4261)
f you have a route with a Multicast with parallel processing and a
timeout and a sub-route in the multicast which is creating an
OutputStreamCache before the timeout and is writing to the
OutputStreamCache after the timeout then the created file is never
deleted from the file system.
Co-authored-by: Franz Forsthofer <[email protected]>
---
.../MulticastParallelTimeoutStreamCachingTest.java | 36 +++++++++++++++++++++-
.../converter/stream/FileInputStreamCache.java | 11 +++++++
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
index b18f86b..51a9883 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FilterInputStream;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.camel.ContextTestSupport;
@@ -26,6 +27,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.converter.stream.CachedOutputStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -61,7 +63,7 @@ public class MulticastParallelTimeoutStreamCachingTest
extends ContextTestSuppor
}
@Test
- public void testSendingAMessageUsingMulticastConvertsToReReadable() throws
Exception {
+ public void testCreateOutputStreamCacheAfterTimeout() throws Exception {
getMockEndpoint("mock:x").expectedBodiesReceived(bodyString);
template.sendBody("direct:a", "testMessage");
@@ -73,6 +75,15 @@ public class MulticastParallelTimeoutStreamCachingTest
extends ContextTestSuppor
File[] files = f.listFiles();
assertEquals(0, files.length);
}
+
+ @Test
+ public void
testCreateOutputStreamCacheBeforeTimeoutButWriteToOutputStreamCacheAfterTimeout()
throws Exception {
+ getMockEndpoint("mock:exception").expectedMessageCount(1);
+ getMockEndpoint("mock:y").expectedMessageCount(0);
+
+ template.sendBody("direct:b", "testMessage");
+ assertMockEndpointsSatisfied();
+ }
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
@@ -91,6 +102,23 @@ public class MulticastParallelTimeoutStreamCachingTest
extends ContextTestSuppor
});
}
};
+
+ final Processor processor2 = new Processor() {
+ public void process(Exchange exchange) throws IOException {
+ // create first the OutputStreamCache and then sleep
+ CachedOutputStream outputStream = new
CachedOutputStream(exchange);
+ try {
+ // sleep for one second so that the write to the
CachedOutputStream happens after the main exchange has finished due to timeout
on the multicast
+ Thread.sleep(1000l);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("Unexpected exception", e);
+ }
+ outputStream.write(BODY);
+ Message in = exchange.getIn();
+ // use FilterInputStream to trigger streamcaching
+ in.setBody(outputStream.getInputStream());
+ }
+ };
return new RouteBuilder() {
public void configure() {
@@ -101,10 +129,16 @@ public class MulticastParallelTimeoutStreamCachingTest
extends ContextTestSuppor
context.getStreamCachingStrategy().setRemoveSpoolDirectoryWhenStopping(false);
context.getStreamCachingStrategy().setSpoolThreshold(1l);
context.setStreamCaching(true);
+
+ onException(IOException.class).to("mock:exception");
from("direct:a").multicast().timeout(500l).parallelProcessing().to("direct:x");
from("direct:x").process(processor1).to("mock:x");
+
+
from("direct:b").multicast().timeout(500l).parallelProcessing().to("direct:y");
+
+ from("direct:y").process(processor2).to("mock:y");
}
};
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
index 1589b3b..616b081 100644
---
a/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
+++
b/core/camel-support/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
@@ -246,6 +246,17 @@ public final class FileInputStreamCache extends
InputStream implements StreamCac
if (tempFile != null) {
throw new IllegalStateException("The method
'createOutputStream' can only be called once!");
}
+ if (closedOnCompletion && exchangeCounter.get() == 0) {
+ // exchange was already stopped -> in this case the tempFile
would never be deleted.
+ // This can happen when in the splitter or Multi-cast case
with parallel processing, the CachedOutputStream is created when the main unit
of work
+ // is still active, but has a timeout and after the timeout
which stops the unit of work the FileOutputStream is created.
+ // We only can throw here an Exception and inform the user
that the processing took longer than the set timeout.
+ String error = "Cannot create a FileOutputStream for Stream
Caching, because this FileOutputStream would never be removed from the file
system."
+ + " This situation can happen with a Splitter or Multi
Cast in parallel processing if there is a timeout set on the Splitter or Multi
Cast, "
+ + " and the processing in a sub-branch takes longer
than the timeout. Consider to increase the timeout.";
+ LOG.error(error);
+ throw new IOException(error);
+ }
tempFile = FileUtil.createTempFile("cos", ".tmp",
strategy.getSpoolDirectory());
LOG.trace("Creating temporary stream cache file: {}", tempFile);