This is an automated email from the ASF dual-hosted git repository.
tsato pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push:
new bc4755c CAMEL-13734: camel-undertow - Fix one-way streaming for
producer with tests
bc4755c is described below
commit bc4755c25aae841d66519525d68a57f39f441472
Author: Tadayoshi Sato <[email protected]>
AuthorDate: Thu Jul 18 13:41:33 2019 +0900
CAMEL-13734: camel-undertow - Fix one-way streaming for producer with tests
(cherry picked from commit 5190b410c85a8169ee6a7eb24db8df4ac050c979)
---
.../component/undertow/UndertowClientCallback.java | 18 ++--
.../camel/component/undertow/UndertowProducer.java | 13 ++-
.../undertow/UndertowStreamingClientCallback.java | 26 ++++-
.../undertow/UndertowHttpStreamingTest.java | 119 +++++++++++++++++++++
4 files changed, 163 insertions(+), 13 deletions(-)
diff --git
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java
index 21a4171..4eac8e7 100644
---
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java
+++
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowClientCallback.java
@@ -100,15 +100,15 @@ class UndertowClientCallback implements
ClientCallback<ClientConnection> {
protected final ClientRequest request;
- private final ByteBuffer body;
-
- private final AsyncCallback callback;
+ protected final AsyncCallback callback;
/**
* A queue of resources that will be closed when the exchange ends, add
more
* resources via {@link #deferClose(Closeable)}.
*/
- private final BlockingDeque<Closeable> closables = new
LinkedBlockingDeque<>();
+ protected final BlockingDeque<Closeable> closables = new
LinkedBlockingDeque<>();
+
+ private final ByteBuffer body;
private final Boolean throwExceptionOnFailure;
@@ -161,9 +161,13 @@ class UndertowClientCallback implements
ClientCallback<ClientConnection> {
}
}
- void finish(final Message result) {
- for (final Closeable closeable : closables) {
- IoUtils.safeClose(closeable);
+ protected void finish(final Message result) {
+ finish(result, true);
+ }
+
+ protected void finish(final Message result, boolean close) {
+ if (close) {
+ closables.forEach(IoUtils::safeClose);
}
if (result != null) {
diff --git
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
index 279a350..2a7a5bd 100644
---
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
+++
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowProducer.java
@@ -141,7 +141,8 @@ public class UndertowProducer extends DefaultAsyncProducer {
final Object body = undertowHttpBinding.toHttpRequest(request,
camelExchange.getIn());
final UndertowClientCallback clientCallback;
- if (getEndpoint().isUseStreaming() && (body instanceof InputStream)) {
+ final boolean streaming = getEndpoint().isUseStreaming();
+ if (streaming && (body instanceof InputStream)) {
// For streaming, make it chunked encoding instead of specifying
content length
requestHeaders.put(Headers.TRANSFER_ENCODING, "chunked");
clientCallback = new
UndertowStreamingClientCallback(camelExchange, callback, getEndpoint(),
@@ -156,8 +157,14 @@ public class UndertowProducer extends DefaultAsyncProducer
{
requestHeaders.put(Headers.CONTENT_LENGTH,
bodyAsByte.remaining());
}
- clientCallback = new UndertowClientCallback(camelExchange,
callback, getEndpoint(),
- request, bodyAsByte);
+ if (streaming) {
+ // response may receive streaming
+ clientCallback = new
UndertowStreamingClientCallback(camelExchange, callback, getEndpoint(),
+ request, bodyAsByte);
+ } else {
+ clientCallback = new UndertowClientCallback(camelExchange,
callback, getEndpoint(),
+ request, bodyAsByte);
+ }
}
if (log.isDebugEnabled()) {
diff --git
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java
index b1e2e34..af87ef7 100644
---
a/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java
+++
b/components/camel-undertow/src/main/java/org/apache/camel/component/undertow/UndertowStreamingClientCallback.java
@@ -28,7 +28,10 @@ import io.undertow.client.ClientExchange;
import io.undertow.client.ClientRequest;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;
+import org.xnio.IoUtils;
import org.xnio.channels.StreamSinkChannel;
class UndertowStreamingClientCallback extends UndertowClientCallback {
@@ -37,19 +40,36 @@ class UndertowStreamingClientCallback extends
UndertowClientCallback {
UndertowStreamingClientCallback(Exchange exchange, AsyncCallback callback,
UndertowEndpoint endpoint, ClientRequest
request,
+ ByteBuffer body) {
+ super(exchange, callback, endpoint, request, body);
+ this.bodyStream = null;
+ }
+
+ UndertowStreamingClientCallback(Exchange exchange, AsyncCallback callback,
+ UndertowEndpoint endpoint, ClientRequest
request,
InputStream body) {
super(exchange, callback, endpoint, request, null);
this.bodyStream = body;
}
@Override
- public void completed(ClientConnection connection) {
- // no connection closing registered as streaming continues downstream
- connection.sendRequest(request, on(this::performClientExchange));
+ protected void finish(Message result) {
+ boolean close = true;
+ if (result != null && result.getBody() instanceof InputStream) {
+ // no connection closing as streaming continues downstream
+ close = false;
+ }
+ finish(result, close);
}
@Override
protected void writeRequest(ClientExchange clientExchange) {
+ if (bodyStream == null) {
+ super.writeRequest(clientExchange);
+ return;
+ }
+
+ // send request stream
StreamSinkChannel requestChannel = clientExchange.getRequestChannel();
try (ReadableByteChannel source = Channels.newChannel(bodyStream)) {
IOHelper.transfer(source, requestChannel);
diff --git
a/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpStreamingTest.java
b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpStreamingTest.java
new file mode 100644
index 0000000..2c1d938
--- /dev/null
+++
b/components/camel-undertow/src/test/java/org/apache/camel/component/undertow/UndertowHttpStreamingTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.undertow;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.junit.Test;
+
+public class UndertowHttpStreamingTest extends BaseUndertowTest {
+
+ private static final String LINE =
+ String.join("", Collections.nCopies(100, "0123456789"));
+ private static final long COUNT = 1000; // approx. 1MB
+
+ @Test
+ public void testTwoWayStreaming() throws Exception {
+ long expectedLength = LINE.length() * COUNT;
+ MockEndpoint mock = getMockEndpoint("mock:length");
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(expectedLength);
+
+ Exchange response = template.send(
+ "undertow:http://localhost:{{port}}?useStreaming=true",
+ e -> produceStream(e));
+ consumeStream(response);
+ long length = response.getIn().getBody(Long.class).longValue();
+
+ mock.assertIsSatisfied();
+ assertEquals(expectedLength, length);
+ }
+
+ @Test
+ public void testOneWayStreaming() throws Exception {
+ long expectedLength = LINE.length() * COUNT;
+ MockEndpoint mock = getMockEndpoint("mock:length");
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived(12);
+
+ Exchange response = template.send(
+ "undertow:http://localhost:{{port}}?useStreaming=true",
+ e -> { e.getIn().setBody("Hello Camel!"); });
+ consumeStream(response);
+ long length = response.getIn().getBody(Long.class).longValue();
+
+ mock.assertIsSatisfied();
+ assertEquals(expectedLength, length);
+ }
+
+ private static void produceStream(Exchange exchange) throws IOException {
+ PipedOutputStream out = new PipedOutputStream();
+ exchange.getIn().setBody(new PipedInputStream(out));
+ new Thread(() -> {
+ try (OutputStreamWriter osw = new OutputStreamWriter(out);
+ BufferedWriter writer = new BufferedWriter(osw)) {
+ LongStream.range(0, COUNT).forEach(i -> {
+ try {
+ writer.write(LINE);
+ writer.newLine();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }).start();
+ }
+
+ private static void consumeStream(Exchange exchange) throws IOException {
+ try (InputStream in = exchange.getIn().getBody(InputStream.class);
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(in))) {
+ long length = reader.lines()
+ .collect(Collectors.summingLong(String::length));
+ exchange.getIn().setBody(length);
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("undertow:http://localhost:{{port}}?useStreaming=true")
+ .process(e -> consumeStream(e))
+ .to("mock:length")
+ .process(e -> produceStream(e));
+ }
+ };
+ }
+
+}