Hi, I'm using the version 'apache-activemq-5.3.1',and the code here. Maybe something wrong with the configuration? The activemq.xml is like this:
file:${activemq.base}/conf/credentials.properties " producerFlowControl="false" memoryLimit="1 gb"> " producerFlowControl="false" memoryLimit="1 gb"> 1. The producer import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/"; private String subject = "blob.queue"; private Destination destination = null; private ActiveMQConnection connection = null; private ActiveMQSession session = null; private MessageProducer producer = null; // 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = (ActiveMQConnection)connectionFactory.createConnection(); session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); connection.setCopyMessageOnSend(false); } // 发送消息 public void produceMessage(String filename) throws JMSException, Exception { initialize(); File file = new File(filename); BlobMessage msg = session.createBlobMessage(file); //设置一些所发送附件的属性 msg.setStringProperty("fileName", file.getName()); msg.setLongProperty("fileSize", file.length()); connection.start(); System.out.println("Producer:->Sending message: " + file.getName()+",文件大小为:" + file.length()+"B"); //发送文件 producer.send(msg); System.out.println("Producer:->Message sent complete!"); } // 关闭连接 public void close() throws JMSException { System.out.println("Producer:->Closing connection"); if (producer != null) producer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } } 2. The consumer package blobMsg; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; import org.apache.activemq.blob.BlobDownloadStrategy; import org.apache.activemq.blob.FTPBlobDownloadStrategy; import org.apache.activemq.command.ActiveMQBlobMessage; public class ConsumerTool implements MessageListener { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = "tcp://localhost:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/"; private String subject = "blob.queue"; private Destination destination = null; private ActiveMQConnection connection = null; private ActiveMQSession session = null; private MessageConsumer consumer = null; // 初始化 private void initialize() throws JMSException, Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); connection = (ActiveMQConnection)connectionFactory.createConnection(); session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); consumer = session.createConsumer(destination); } // 消费消息 public void consumeMessage() throws JMSException, Exception { initialize(); connection.start(); // 开始监听 System.out.println("Consumer:->Begin listening..."); // 设置监听器 consumer.setMessageListener(this); // Message message = consumer.receive(); } // 关闭连接 public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } // 消息处理函数 public void onMessage(Message message) { try { if (message instanceof BlobMessage) { BlobMessage blobMsg = (BlobMessage) message; System.out.println("Consumer:->Received: " + blobMsg); String fileName = blobMsg.getStringProperty("fileName"); System.out.println("fileName:"+fileName); long fileSize = blobMsg.getLongProperty("fileSize"); System.out.println("fileSize:"+fileSize+"B"); InputStream inStr = blobMsg.getInputStream(); BufferedInputStream bin = new BufferedInputStream(inStr); File file = new File("d:/test"+fileName); BufferedOutputStream bout = new BufferedOutputStream(new FileOutputStream(file)); byte[] block = new byte[1024]; while (bin.read(block)!=-1) { bout.write(block); } bout.close(); bin.close(); inStr.close(); } else { System.out.println("Consumer:->Received: " + message); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } 3. The test code import javax.jms.JMSException; public class Test { public static void main(String[] args) throws JMSException, Exception { // TODO Auto-generated method stub ConsumerTool consumer = new ConsumerTool(); ProducerTool producer = new ProducerTool(); // 开始监听 consumer.consumeMessage(); // 延时500毫秒之后发送消息 Thread.sleep(500); producer.produceMessage("d:/1.doc"); producer.close(); // 延时500毫秒之后停止接受消息 Thread.sleep(500); consumer.close(); } } Dejan Bosanac wrote: > > Hi, > > what version are you using and can you post your code? > > Cheers > -- > Dejan Bosanac - http://twitter.com/dejanb > > Open Source Integration - http://fusesource.com/ > ActiveMQ in Action - http://www.manning.com/snyder/ > Blog - http://www.nighttale.net > > > On Wed, Apr 14, 2010 at 5:11 AM, ireneshy wrote: > >> >> Hi , >> I am trying a simple producer and consumer application for BlobMessage, >> it >> works fine for files of size 8 kb.But if I send file of size more than >> 64KB >> , it throws an exception in the activemq console : >> >> ERROR log - EXCEPTION >> java.lang.IllegalArgumentException >> at java.nio.Buffer.position(Buffer.java:218) >> at org.mortbay.io.nio.NIOBuffer.poke(NIOBuffer.java:142) >> at org.mortbay.io.AbstractBuffer.put(AbstractBuffer.java:391) >> at org.mortbay.jetty.HttpParser.reset(HttpParser.java:844) >> at >> org.mortbay.jetty.HttpConnection.destroy(HttpConnection.java:131) >> at >> >> org.mortbay.jetty.AbstractConnector.connectionClosed(AbstractConnector.java:785) >> at >> >> org.mortbay.jetty.nio.SelectChannelConnector.access$100(SelectChannelConnector.java:64) >> at >> >> org.mortbay.jetty.nio.SelectChannelConnector$1.endPointClosed(SelectChannelConnector.java:92) >> at >> >> org.mortbay.io.nio.SelectChannelEndPoint.doUpdateKey(SelectChannelEndPoint.java:382) >> at >> >> org.mortbay.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:337) >> at >> org.mortbay.io.nio.SelectorManager.doSelect(SelectorManager.java:166) >> at >> >> org.mortbay.jetty.nio.SelectChannelConnector.accept(SelectChannelConnector.java:124) >> at >> >> org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:537) >> >> I've tried the code listed in the url: >> >> http://old.nabble.com/Getting-IllegalAgrEception-while-Sending-BlobMessage-td17731668.html#a17731668 >> >> But got the same error. Who knows how to fix the problem? >> >> >> -- >> View this message in context: >> http://old.nabble.com/Getting-IllegalAgrEception-while-Sending-BlobMessage-tp28237935p28237935.html >> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >> >> > > > ----- > Dejan Bosanac > > Open Source Integration - http://fusesource.com/ > ActiveMQ in Action - http://www.manning.com/snyder/ > Blog - http://www.nighttale.net > -- View this message in context: http://old.nabble.com/Getting-IllegalAgrEception-while-Sending-BlobMessage-tp28237935p28239844.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.