Hello,i am in the following situation. Several activemq client will send big file. I made a simple unittest using ActiveMQInput/OutputStream, 10 threads sending a 100M file and one thread reading one file at a time.
if i store the 10 files sequentially, no troublesif i send files in parallel and use the selector to distinguish between files, ActiveMQInputStream lock after receiving 1.8M. I understand this is because of queue page size and the selector applies only to message in first page. So packet from 9 other files prevent inputstream from receiving the rest of first file.
So i created 1 queue per file instead. However, it exhibits same behaviour. Looks like all my queues are sharing the same kahaDB page?
So what is the proper way to send big files over activemq without having files blocking each other? Is it possible, when i create my new queue for the file, that i specify it has it's own page? We can not afford to read all files in parallel!
If usefull, see below the code used for the test. Thanks in advance for any information you could have.
import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.HashMap; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueSession; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.commons.io.IOUtils; import org.testng.Assert; import org.testng.annotations.AfterTest; import org.testng.annotations.Test; public class activeMQMassiveSendFilesTest { TestingConfigurator configurator; @AfterTest public void clean() { configurator.cleanup(); } @Test public void massiveTest() throws Exception { configurator = new TestingConfigurator(); configurator.init(); String[] names = new String[] { "bla", "blabla", "blablabla", "444", "uuid123", "truc", "bidule", "machin", "alpha", "beta" }; Thread[] t = new Thread[10]; class enqueuer extends Thread { private String id; public enqueuer(String id) { this.id = id; } @Override public void run() { // TODO Auto-generated method stub try { enqueueBigMessageContent(id, new MassiveInputStream( 1024 * 1024 * 100));// 100m file } catch (JMSException e) { e.printStackTrace(); } } } for (int i = 0; i < 10; i++) t[i] = new enqueuer(names[i]); for (int i = 0; i < 10; i++) t[i].start(); for (int i = 0; i < 10; i++) t[i].join(); for (String name : names) { Assert.assertTrue(dequeueBigMessageContent(name)); } } private boolean dequeueBigMessageContent(String name) throws JMSException { ActiveMQConnection connection = (ActiveMQConnection) configurator.connectionFactory .createConnection(); connection.start(); QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); String queueName = configurator.testQueue+"."+name; System.out.println("reading from "+queueName); Queue destination = queueSession.createQueue(queueName); InputStream is = connection.createInputStream(destination); int read=0; int count = 0; boolean ok=true; try { while ((read=is.read())>=0){ ok=ok & ( ((count++)&0xFF)==read); if (count%(1024*1024)==0) System.out.println((count/1024/1024)+"M read"); } is.close(); } catch (IOException e) { e.printStackTrace(); } System.out.println("read "+(((float)count)/1024.0/1024.0)+"M elements from stream"); return ok; } private void enqueueBigMessageContent(final String uid, InputStream source) throws JMSException { HashMap<String, Object> map = new HashMap<String, Object>(); map.put("fileuid", uid); // init map message ActiveMQConnection connection = (ActiveMQConnection) configurator.connectionFactory .createConnection(); connection.start(); QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); String queueName = configurator.testQueue+"."+uid; System.out.println("writing to "+queueName); Queue destination = queueSession.createQueue(queueName); OutputStream os = connection.createOutputStream(destination, map, DeliveryMode.PERSISTENT, 4, 0); try { System.out.println("start enqueuing " + uid); IOUtils.copy(source, os); os.close(); System.out.println("done enqueuing " + uid); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } connection.close(); System.out.println("connection closed for " + uid); } private static class MassiveInputStream extends InputStream { private long size; private long current; public MassiveInputStream(long size) { this.size = size; } @Override public int read() throws IOException { if (current > size) return -1; return (int) ((current++) & 0xFF); } } } -- David Delbecq ICT Institut Royal Météorologique Ext:557
smime.p7s
Description: S/MIME Cryptographic Signature