Where is the "there" that it hangs? Where specifically is the thread hung?
Also, why are you setting your message listener after you start the connection? Tim On Apr 29, 2016 7:51 AM, "opacut" <oliver.pa...@gmail.com> wrote: Hello, I am writing a sort of performance test on AMQ 5.13. The producer is working perfectly, but the consumer connects and does not receive any messages, just hangs in there. I have trouble finding the point where it breaks. The program spawns a number of worker threads that connect to the broker instance. I will copy the configuration and the entire program. I have found a similar question but it was unanswered. I am in need of assistance, thank You in advance. Oliver Pacut The critical section I think is this: @Override public void run() { try{ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerAddress); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); messageConsumer = session.createConsumer(topic); } catch (JMSException e) { e.printStackTrace(); } MessageListener messageListener = new MessageListener() { @Override public void onMessage(Message message) { msgs++; } }; setUp = true; try{ while(!ready){ Thread.sleep(1);} } catch (InterruptedException ex) { System.out.println("You done goofed in da worka."); } long startMs; try { connection.start(); messageConsumer.setMessageListener(messageListener); while (msgs == 0){ Thread.sleep(1); } if(!first){ first = true; } startMs = System.currentTimeMillis(); while ((System.currentTimeMillis() <= (startMs + runfor))) { Thread.sleep(1); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { connection.stop(); messageConsumer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } ConsumerPerformance.parent.updateMessages(msgs); } The whole configuration activemq.xml: <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.conf}/credentials.properties</value> </property> </bean> <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" > <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext createConnector="false"/> </managementContext> <persistenceAdapter> <kahaDB directory="/media/opacut/Windows8_OS/ubuntu/ActiveMQ"/> </persistenceAdapter> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage percentOfJvmHeap="70" /> </memoryUsage> <storeUsage> <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector name="openwire" uri="tcp:// 0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600 "/> </transportConnectors> <shutdownHooks> <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> </shutdownHooks> </broker> <import resource="jetty.xml"/> </beans> And the consumer code: /** * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.concurrent.atomic.AtomicLong; import static net.sourceforge.argparse4j.impl.Arguments.store; public class ConsumerPerformance { static ConsumerPerformance parent; protected static AtomicLong totalMessages = new AtomicLong(0); protected static boolean setUp, ready, first; private final Object lock = new Object(); public void updateMessages(long msgs){ synchronized (lock){ totalMessages.addAndGet(msgs); } } public static void main(String[] args) throws Exception { parent = new ConsumerPerformance(); ArgumentParser parser = argParser(); ready = false; String topicName, address; int recordSize = 0; long start = 0, cons, runfor, end = 0; try { Namespace res = parser.parseArgs(args); /* parse args */ topicName = res.getString("topic"); recordSize = res.getInt("recordSize"); cons = res.getLong("cons"); runfor = res.getLong("runfor"); address = res.getString("broker"); ThreadGroup tg = new ThreadGroup("consumers"); String nm = "thread"; for(int i=0; i<cons; i++){ setUp = false; String threadName = nm+i; Thread thrd = new WorkerThread(tg, threadName, address, topicName, runfor); thrd.start(); while(!setUp){ Thread.sleep(1); } } ready = true; System.out.println("Setup finished."); while(!first){ Thread.sleep(1); } start = System.currentTimeMillis(); while(tg.activeCount() > 1){ Thread.sleep(1); } end = System.currentTimeMillis(); } catch (ArgumentParserException e) { if (args.length == 0) { parser.printHelp(); System.exit(0); } else { parser.handleError(e); System.exit(1); } } long duration = end - start; System.out.printf("Result: %d records received," + " %f records/sec (%.2f MB/sec)\n", totalMessages.get(), (totalMessages.floatValue())*1000/(float) duration, (((((float)recordSize*totalMessages.floatValue())/1024)/1024)*1000)/(float) duration); } /** Get the command-line argument parser. */ private static ArgumentParser argParser() { ArgumentParser parser = ArgumentParsers .newArgumentParser("producer-performance") .defaultHelp(true) .description("This tool is used to verify the producer performance."); parser.addArgument("--topic") .action(store()) .required(true) .type(String.class) .metavar("TOPIC") .help("produce messages to this topic"); parser.addArgument("--runfor") .action(store()) .required(false) .type(Long.class) .metavar("DURATION") .dest("runfor") .help("specify for how long the program will run"); parser.addArgument("--cons") .action(store()) .required(false) .type(Long.class) .metavar("CONSUMERS") .dest("cons") .help("specify for how many consumer threads will run"); parser.addArgument("--record-size") .action(store()) .required(true) .type(Integer.class) .metavar("RECORD-SIZE") .dest("recordSize") .help("message size in bytes"); parser.addArgument("--broker-url") .action(store()) .required(true) .type(String.class) .metavar("BROKER") .help("produce messages to this broker"); return parser; } private static class WorkerThread extends Thread { private String brokerAddress; private String topicName; private long runfor; private long msgs = 0; private Connection connection; private Session session; private MessageConsumer messageConsumer; public WorkerThread(ThreadGroup group, String nm, String address, String tpc, long runf) { super(group, nm); topicName = tpc; runfor = runf; brokerAddress = (address == null) ? ActiveMQConnectionFactory.DEFAULT_BROKER_URL : address; } @Override public void run() { try{ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerAddress); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(topicName); messageConsumer = session.createConsumer(topic); } catch (JMSException e) { e.printStackTrace(); } MessageListener messageListener = new MessageListener() { @Override public void onMessage(Message message) { msgs++; } }; setUp = true; try{ while(!ready){ Thread.sleep(1);} } catch (InterruptedException ex) { System.out.println("You done goofed in da worka."); } long startMs; try { connection.start(); messageConsumer.setMessageListener(messageListener); while (msgs == 0){ Thread.sleep(1); } if(!first){ first = true; } startMs = System.currentTimeMillis(); while ((System.currentTimeMillis() <= (startMs + runfor))) { Thread.sleep(1); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { connection.stop(); messageConsumer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } ConsumerPerformance.parent.updateMessages(msgs); } } } -- View this message in context: http://activemq.2283324.n4.nabble.com/Consumer-not-receiving-anything-tp4711362.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.