Hello,

I am writing a sort of performance test on AMQ 5.13. The producer is working
perfectly, but the consumer connects and does not receive any messages, just
hangs in there. I have trouble finding the point where it breaks. The
program spawns a number of worker threads that connect to the broker
instance. I will copy the configuration and the entire program. I have found
a similar question but it was unanswered.

I am in need of assistance, thank You in advance.

Oliver Pacut



The critical section I think is this:

@Override
public void run() {
    try{
        ConnectionFactory connectionFactory = new
                ActiveMQConnectionFactory(brokerAddress);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        messageConsumer = session.createConsumer(topic);
    } catch (JMSException e) {
        e.printStackTrace();
    }

    MessageListener messageListener = new MessageListener() {
        @Override
        public void onMessage(Message message) {
            msgs++;
        }
    };

    setUp = true;

    try{
        while(!ready){ Thread.sleep(1);}
    } catch (InterruptedException ex) {
        System.out.println("You done goofed in da worka.");
    }

    long startMs;

    try {
        connection.start();
        messageConsumer.setMessageListener(messageListener);

        while (msgs == 0){
            Thread.sleep(1);
        }

        if(!first){
            first = true;
        }
        startMs = System.currentTimeMillis();

        while ((System.currentTimeMillis() <= (startMs + runfor))) {
            Thread.sleep(1);
        }


    } catch (JMSException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        try {
            connection.stop();
            messageConsumer.close();
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    ConsumerPerformance.parent.updateMessages(msgs);

}


The whole configuration activemq.xml:



<beans
  xmlns="http://www.springframework.org/schema/beans";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd";>

    
    <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    
    <broker xmlns="http://activemq.apache.org/schema/core";
brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        
        <persistenceAdapter>
           <kahaDB directory="/media/opacut/Windows8_OS/ubuntu/ActiveMQ"/>
        </persistenceAdapter>

          
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        
        <transportConnectors>
            
            <transportConnector name="openwire"
uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
       
        </transportConnectors>

        
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans";
class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    
    <import resource="jetty.xml"/>

</beans>


And the consumer code:

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
 * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
 * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
 * License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
 * specific language governing permissions and limitations under the
License.
 */

import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicLong;

import static net.sourceforge.argparse4j.impl.Arguments.store;

public class ConsumerPerformance {
    static ConsumerPerformance parent;
    protected static AtomicLong totalMessages = new AtomicLong(0);
    protected static boolean setUp, ready, first;
    private final Object lock = new Object();

    public void updateMessages(long msgs){
        synchronized (lock){
            totalMessages.addAndGet(msgs);
        }
    }

    public static void main(String[] args) throws Exception {
        parent = new ConsumerPerformance();
        ArgumentParser parser = argParser();
        ready = false;

        String topicName, address;
        int recordSize = 0;
        long start = 0, cons, runfor, end = 0;

        try {
            Namespace res = parser.parseArgs(args);

            /* parse args */
            topicName = res.getString("topic");
            recordSize = res.getInt("recordSize");
            cons = res.getLong("cons");
            runfor = res.getLong("runfor");
            address = res.getString("broker");

            ThreadGroup tg = new ThreadGroup("consumers");

            String nm = "thread";
            for(int i=0; i<cons; i++){
                setUp = false;
                String threadName = nm+i;
                Thread thrd = new WorkerThread(tg, threadName, address,
topicName,
                        runfor);
                thrd.start();
                while(!setUp){
                    Thread.sleep(1);
                }

            }
            ready = true;
            System.out.println(&quot;Setup finished.&quot;);

            while(!first){
                Thread.sleep(1);
            }

            start = System.currentTimeMillis();
            while(tg.activeCount() > 1){
                Thread.sleep(1);
            }
            end = System.currentTimeMillis();

        } catch (ArgumentParserException e) {
            if (args.length == 0) {
                parser.printHelp();
                System.exit(0);
            } else {
                parser.handleError(e);
                System.exit(1);
            }
        }
        long duration = end - start;
        System.out.printf("Result:  %d records received," +
                        " %f records/sec (%.2f MB/sec)\n",
                totalMessages.get(),
                (totalMessages.floatValue())*1000/(float) duration,
               
(((((float)recordSize*totalMessages.floatValue())/1024)/1024)*1000)/(float)
duration);

    }

    /** Get the command-line argument parser. */
    private static ArgumentParser argParser() {
        ArgumentParser parser = ArgumentParsers
                .newArgumentParser("producer-performance")
                .defaultHelp(true)
                .description("This tool is used to verify the producer
performance.");

        parser.addArgument("--topic")
                .action(store())
                .required(true)
                .type(String.class)
                .metavar("TOPIC")
                .help("produce messages to this topic");

        parser.addArgument("--runfor")
                .action(store())
                .required(false)
                .type(Long.class)
                .metavar("DURATION")
                .dest("runfor")
                .help("specify for how long the program will run");

        parser.addArgument("--cons")
                .action(store())
                .required(false)
                .type(Long.class)
                .metavar("CONSUMERS")
                .dest("cons")
                .help("specify for how many consumer threads will run");

        parser.addArgument("--record-size")
                .action(store())
                .required(true)
                .type(Integer.class)
                .metavar("RECORD-SIZE")
                .dest("recordSize")
                .help("message size in bytes");

        parser.addArgument("--broker-url")
                .action(store())
                .required(true)
                .type(String.class)
                .metavar("BROKER")
                .help("produce messages to this broker");

        return parser;
    }

    private static class WorkerThread extends Thread {
        private String brokerAddress;
        private String topicName;
        private long runfor;

        private long msgs = 0;

        private Connection connection;
        private Session session;
        private MessageConsumer messageConsumer;

        public WorkerThread(ThreadGroup group, String nm, String address,
String tpc,
                            long runf) {
            super(group, nm);
            topicName = tpc;
            runfor = runf;
            brokerAddress = (address == null) ?
ActiveMQConnectionFactory.DEFAULT_BROKER_URL : address;
        }

        @Override
        public void run() {
            try{
                ConnectionFactory connectionFactory = new
                        ActiveMQConnectionFactory(brokerAddress);
                connection = connectionFactory.createConnection();
                session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
                Topic topic = session.createTopic(topicName);
                messageConsumer = session.createConsumer(topic);
            } catch (JMSException e) {
                e.printStackTrace();
            }

            MessageListener messageListener = new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    msgs++;
                }
            };

            setUp = true;

            try{
                while(!ready){ Thread.sleep(1);}
            } catch (InterruptedException ex) {
                System.out.println("You done goofed in da worka.");
            }

            long startMs;

            try {
                connection.start();
                messageConsumer.setMessageListener(messageListener);

                while (msgs == 0){
                    Thread.sleep(1);
                }

                if(!first){
                    first = true;
                }
                startMs = System.currentTimeMillis();

                while ((System.currentTimeMillis() <= (startMs + runfor))) {
                    Thread.sleep(1);
                }


            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                try {
                    connection.stop();
                    messageConsumer.close();
                    session.close();
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            ConsumerPerformance.parent.updateMessages(msgs);

        }
    }

}



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Consumer-not-receiving-anything-tp4711362.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to