Hello, I am writing a sort of performance test on AMQ 5.13. The producer is working perfectly, but the consumer connects and does not receive any messages, just hangs in there. I have trouble finding the point where it breaks. The program spawns a number of worker threads that connect to the broker instance. I will copy the configuration and the entire program. I have found a similar question but it was unanswered.
I am in need of assistance, thank You in advance. Oliver Pacut The critical section I think is this: @Override public void run() { try{ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerAddress); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); messageConsumer = session.createConsumer(topic); } catch (JMSException e) { e.printStackTrace(); } MessageListener messageListener = new MessageListener() { @Override public void onMessage(Message message) { msgs++; } }; setUp = true; try{ while(!ready){ Thread.sleep(1);} } catch (InterruptedException ex) { System.out.println("You done goofed in da worka."); } long startMs; try { connection.start(); messageConsumer.setMessageListener(messageListener); while (msgs == 0){ Thread.sleep(1); } if(!first){ first = true; } startMs = System.currentTimeMillis(); while ((System.currentTimeMillis() <= (startMs + runfor))) { Thread.sleep(1); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { connection.stop(); messageConsumer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } ConsumerPerformance.parent.updateMessages(msgs); } The whole configuration activemq.xml: <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.conf}/credentials.properties</value> </property> </bean> <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" > <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext createConnector="false"/> </managementContext> <persistenceAdapter> <kahaDB directory="/media/opacut/Windows8_OS/ubuntu/ActiveMQ"/> </persistenceAdapter> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage percentOfJvmHeap="70" /> </memoryUsage> <storeUsage> <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors> <shutdownHooks> <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> </shutdownHooks> </broker> <import resource="jetty.xml"/> </beans> And the consumer code: /** * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.concurrent.atomic.AtomicLong; import static net.sourceforge.argparse4j.impl.Arguments.store; public class ConsumerPerformance { static ConsumerPerformance parent; protected static AtomicLong totalMessages = new AtomicLong(0); protected static boolean setUp, ready, first; private final Object lock = new Object(); public void updateMessages(long msgs){ synchronized (lock){ totalMessages.addAndGet(msgs); } } public static void main(String[] args) throws Exception { parent = new ConsumerPerformance(); ArgumentParser parser = argParser(); ready = false; String topicName, address; int recordSize = 0; long start = 0, cons, runfor, end = 0; try { Namespace res = parser.parseArgs(args); /* parse args */ topicName = res.getString("topic"); recordSize = res.getInt("recordSize"); cons = res.getLong("cons"); runfor = res.getLong("runfor"); address = res.getString("broker"); ThreadGroup tg = new ThreadGroup("consumers"); String nm = "thread"; for(int i=0; i<cons; i++){ setUp = false; String threadName = nm+i; Thread thrd = new WorkerThread(tg, threadName, address, topicName, runfor); thrd.start(); while(!setUp){ Thread.sleep(1); } } ready = true; System.out.println("Setup finished."); while(!first){ Thread.sleep(1); } start = System.currentTimeMillis(); while(tg.activeCount() > 1){ Thread.sleep(1); } end = System.currentTimeMillis(); } catch (ArgumentParserException e) { if (args.length == 0) { parser.printHelp(); System.exit(0); } else { parser.handleError(e); System.exit(1); } } long duration = end - start; System.out.printf("Result: %d records received," + " %f records/sec (%.2f MB/sec)\n", totalMessages.get(), (totalMessages.floatValue())*1000/(float) duration, (((((float)recordSize*totalMessages.floatValue())/1024)/1024)*1000)/(float) duration); } /** Get the command-line argument parser. */ private static ArgumentParser argParser() { ArgumentParser parser = ArgumentParsers .newArgumentParser("producer-performance") .defaultHelp(true) .description("This tool is used to verify the producer performance."); parser.addArgument("--topic") .action(store()) .required(true) .type(String.class) .metavar("TOPIC") .help("produce messages to this topic"); parser.addArgument("--runfor") .action(store()) .required(false) .type(Long.class) .metavar("DURATION") .dest("runfor") .help("specify for how long the program will run"); parser.addArgument("--cons") .action(store()) .required(false) .type(Long.class) .metavar("CONSUMERS") .dest("cons") .help("specify for how many consumer threads will run"); parser.addArgument("--record-size") .action(store()) .required(true) .type(Integer.class) .metavar("RECORD-SIZE") .dest("recordSize") .help("message size in bytes"); parser.addArgument("--broker-url") .action(store()) .required(true) .type(String.class) .metavar("BROKER") .help("produce messages to this broker"); return parser; } private static class WorkerThread extends Thread { private String brokerAddress; private String topicName; private long runfor; private long msgs = 0; private Connection connection; private Session session; private MessageConsumer messageConsumer; public WorkerThread(ThreadGroup group, String nm, String address, String tpc, long runf) { super(group, nm); topicName = tpc; runfor = runf; brokerAddress = (address == null) ? ActiveMQConnectionFactory.DEFAULT_BROKER_URL : address; } @Override public void run() { try{ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerAddress); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); messageConsumer = session.createConsumer(topic); } catch (JMSException e) { e.printStackTrace(); } MessageListener messageListener = new MessageListener() { @Override public void onMessage(Message message) { msgs++; } }; setUp = true; try{ while(!ready){ Thread.sleep(1);} } catch (InterruptedException ex) { System.out.println("You done goofed in da worka."); } long startMs; try { connection.start(); messageConsumer.setMessageListener(messageListener); while (msgs == 0){ Thread.sleep(1); } if(!first){ first = true; } startMs = System.currentTimeMillis(); while ((System.currentTimeMillis() <= (startMs + runfor))) { Thread.sleep(1); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { connection.stop(); messageConsumer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } ConsumerPerformance.parent.updateMessages(msgs); } } } -- View this message in context: http://activemq.2283324.n4.nabble.com/Consumer-not-receiving-anything-tp4711362.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.