This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b61b8cc56718d4774eedb57b8cc23363c386f57f
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Oct 20 14:15:18 2017 +0200

    CAMEL-11931: camel-jms - Add better support for Stream JMS message type
---
 .../org/apache/camel/component/jms/JmsBinding.java | 43 ++++++++++-
 .../component/jms/StreamMessageInputStream.java    | 90 ++++++++++++++++++++++
 .../component/jms/JmsStreamMessageTypeTest.java    | 78 +++++++++++++++++++
 3 files changed, 209 insertions(+), 2 deletions(-)

diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index 6b2a9a2..3a8439f 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jms;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
 import java.io.Serializable;
@@ -41,6 +42,8 @@ import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
 import org.w3c.dom.Node;
 
 import org.apache.camel.CamelContext;
@@ -154,7 +157,8 @@ public class JmsBinding {
                 return createByteArrayFromBytesMessage((BytesMessage)message);
             } else if (message instanceof StreamMessage) {
                 LOG.trace("Extracting body as a StreamMessage from JMS 
message: {}", message);
-                return message;
+                StreamMessage streamMessage = (StreamMessage)message;
+                return createInputStreamFromStreamMessage(streamMessage);
             } else {
                 return null;
             }
@@ -237,6 +241,10 @@ public class JmsBinding {
         return result;
     }
 
+    protected InputStream createInputStreamFromStreamMessage(StreamMessage 
message) {
+        return new StreamMessageInputStream(message);
+    }
+
     /**
      * Creates a JMS message from the Camel exchange and message
      *
@@ -597,7 +605,7 @@ public class JmsBinding {
             }
             return message;
         }
-        case Object:
+        case Object: {
             ObjectMessage message = session.createObjectMessage();
             if (body != null) {
                 try {
@@ -611,6 +619,37 @@ public class JmsBinding {
                 }
             }
             return message;
+        }
+        case Stream: {
+            StreamMessage message = session.createStreamMessage();
+            if (body != null) {
+                long size = 0;
+                try {
+                    LOG.trace("Writing payload in StreamMessage");
+                    InputStream is = 
context.getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, 
body);
+                    // assume streaming is bigger payload so use same buffer 
size as the file component
+                    byte[] buffer = new byte[FileUtil.BUFFER_SIZE];
+                    int len = 0;
+                    int count = 0;
+                    while (len >= 0) {
+                        count++;
+                        len = is.read(buffer);
+                        if (len >= 0) {
+                            size += len;
+                            LOG.trace("Writing payload chunk {} as bytes in 
StreamMessage", count);
+                            message.writeBytes(buffer, 0, len);
+                        }
+                    }
+                    LOG.trace("Finished writing payload (size {}) as bytes in 
StreamMessage", size);
+                } catch (NoTypeConversionAvailableException | IOException e) {
+                    // cannot convert to inputstream then thrown an exception 
to avoid sending a null message
+                    JMSException cause = new 
MessageFormatException(e.getMessage());
+                    cause.initCause(e);
+                    throw cause;
+                }
+            }
+            return message;
+        }
         default:
             break;
         }
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
new file mode 100644
index 0000000..0d75ec7
--- /dev/null
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.StreamMessage;
+
+public class StreamMessageInputStream extends InputStream {
+
+    private final StreamMessage message;
+    private volatile boolean eof;
+
+    public StreamMessageInputStream(StreamMessage message) {
+        this.message = message;
+    }
+
+    @Override
+    public int read() throws IOException {
+        byte[] array = new byte[1];
+        try {
+            return message.readBytes(array);
+        } catch (MessageEOFException e) {
+            eof = true;
+            return -1;
+        } catch (JMSException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public int read(byte[] array) throws IOException {
+        try {
+            int num = message.readBytes(array);
+            eof = num < 0;
+            return num;
+        } catch (MessageEOFException e) {
+            eof = true;
+            return -1;
+        } catch (JMSException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public int read(byte[] array, int off, int len) throws IOException {
+        // we cannot honor off and len, but assuming off is always 0
+        try {
+            int num = message.readBytes(array);
+            eof = num < 0;
+            return num;
+        } catch (MessageEOFException e) {
+            eof = true;
+            return -1;
+        } catch (JMSException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        try {
+            message.reset();
+        } catch (JMSException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public int available() throws IOException {
+        // if we are end of file then there is no more data, otherwise assume 
there is at least one more byte
+        return eof ? 0 : 1;
+    }
+}
diff --git 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsStreamMessageTypeTest.java
 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsStreamMessageTypeTest.java
new file mode 100644
index 0000000..d9ee858
--- /dev/null
+++ 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsStreamMessageTypeTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import java.io.File;
+import java.io.InputStream;
+import javax.jms.ConnectionFactory;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.FileUtil;
+import org.junit.Test;
+
+import static 
org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
+
+/**
+ * @version 
+ */
+public class JmsStreamMessageTypeTest extends CamelTestSupport {
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/stream");
+        super.setUp();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ConnectionFactory connectionFactory = 
CamelJmsTestHelper.createConnectionFactory();
+        camelContext.addComponent("jms", 
jmsComponentAutoAcknowledge(connectionFactory));
+        return camelContext;
+    }
+
+    @Test
+    public void testStreamType() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        // copy the file
+        FileUtil.copyFile(new File("src/test/data/message1.xml"), new 
File("target/stream/in/message1.xml"));
+
+        assertMockEndpointsSatisfied();
+
+        InputStream is = 
getMockEndpoint("mock:result").getReceivedExchanges().get(0).getIn().getBody(InputStream.class);
+        assertNotNull(is);
+        String xml = context.getTypeConverter().convertTo(String.class, is);
+
+        System.out.println(xml);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("file:target/stream/in").to("jms:queue:foo?jmsMessageType=Stream");
+
+                
from("jms:queue:foo").to("file:target/stream/out").to("mock:result");
+            }
+        };
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to