I have two activemq brokers networked together. The producer broker (has
static network xml tag) shows enqueued and dequeued values matching when
consumer broker consumes the message, but the dequeued value on the consumer
broker shows zero and if I rerun the consumer it receives all the messages
again. Below is all the code I'm using to test this. Why is the consumer
queue not removing the message from the queue?

// Producer activemq.xml
<beans xmlns="http://www.springframework.org/schema/beans";
  xmlns:amq="http://activemq.apache.org/schema/core";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd";>
    <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>      
    </bean>
    <broker xmlns="http://activemq.apache.org/schema/core";
brokerName="transmitBroker" dataDirectory="${activemq.base}/data"
destroyApplicationContextOnStop="true">
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true"
memoryLimit="1mb">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true"
memoryLimit="1mb">
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy> 
        <managementContext>
            <managementContext jmxDomainName="transmitDomainName"
connectorPort="1098" />
        </managementContext>

        <networkConnectors>
           <networkConnector uri="static:(tcp://xx.xxx.x.xx:61619)"
duplex="true" />
        </networkConnectors>
        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/transmit"/>
        </persistenceAdapter>
        <transportConnectors>
           <transportConnector name="openwire" uri="tcp://0.0.0.0:61618"/>
        </transportConnectors>
    </broker>
    <import resource="jetty.xml"/>
</beans>

// MsgSenderTest.java
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MsgSenderTest
{
   public static void main(final String[] args_)   {
      if(args_.length != 4)      {
         System.out.println("Required parameters;IP, port, Test number and
number of messages");
         System.exit(0);
      }

      final ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://" + args_[0] + ":" + args_[1]);

      System.out.println("Connecting to ActiveMQ:" +
connectionFactory.getBrokerURL());

      Connection connection = null;
      Session startTopicSession = null;
      MessageProducer startProducer = null;

      try      {
         final int numberOfMessages = Integer.parseInt(args_[3]);

         connection = connectionFactory.createConnection();
         connection.start();

         startTopicSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

         final Queue startQueue = startTopicSession.createQueue("Test" +
args_[2]);

         startProducer = startTopicSession.createProducer(startQueue);

         for(int i = 0; i < numberOfMessages; i++)
         {
            System.out.println("Sending message #" + (i + 1));
            startProducer.send(startTopicSession.createMessage());

            try            {               Thread.sleep(500);            }
            catch(final Exception e2)            {}
         }

         final Message message = startTopicSession.createMessage();

         message.setStringProperty("END", "");
         startProducer.send(message);
      }
      catch(final Exception e)      {         e.printStackTrace();      }
      finally
      {
         if(startProducer != null)
         {
            try            {               startProducer.close();           
}
            catch(final JMSException e)            {              
e.printStackTrace();            }
         }

         if(startTopicSession != null)
         {
            try            {               startTopicSession.close();           
}
            catch(final JMSException e)            {              
e.printStackTrace();            }
         }

         if(connection != null)
         {
            try            {               connection.close();            }
            catch(final JMSException e)            {              
e.printStackTrace();            }
         }
      }
   }
}

// Consumer activemq.xml
<beans xmlns="http://www.springframework.org/schema/beans";
  xmlns:amq="http://activemq.apache.org/schema/core";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd";>
    <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>      
    </bean>

    <broker xmlns="http://activemq.apache.org/schema/core";
brokerName="receiveBroker" dataDirectory="${activemq.base}/data"
destroyApplicationContextOnStop="true">
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true"
memoryLimit="1mb">
                  <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true"
memoryLimit="1mb">
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy> 
 
        <managementContext>
            <managementContext jmxDomainName="receiveDomainName"
connectorPort="1099" />
        </managementContext>

         <networkConnectors>
         </networkConnectors>

        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/receive"/>
        </persistenceAdapter>

        <transportConnectors>
           <transportConnector name="openwire" uri="tcp://0.0.0.0:61619"/>
        </transportConnectors>
    </broker>
    <import resource="jetty.xml"/>
</beans>

// MsgListenerTest.java
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MsgListenerTest implements MessageListener
{
   private final ActiveMQConnectionFactory connectionFactory;
   private Connection connection;
   private Session session;
   private MessageConsumer consumer;

   private int msgNumber = 1;

   public static void main(final String[] args_)
   {
      if(args_.length != 3){
         System.out.println("Required parameters: IP, port, test number");
         System.exit(0);
      }

      new MsgListenerTest(args_);
   }

   public MsgListenerTest(final String[] args_){
      this.connectionFactory = new ActiveMQConnectionFactory("tcp://" +
args_[0] + ":" + args_[1]);

      final String queueName = "Test" + args_[2];

      try
      {
         this.connection = this.connectionFactory.createConnection();
         this.connection.start();

         this.session = this.connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);

         this.consumer =
this.session.createConsumer(this.session.createQueue(queueName));

         this.consumer.setMessageListener(this);
      }
      catch(final Exception e){e.printStackTrace();}
   }

   @Override
   public void onMessage(final Message message_)
   {
      try
      {
         if(message_.getStringProperty("END") == null)
         {
            System.out.println("Received messaage #" + this.msgNumber);

            this.msgNumber++;
         }
         else
         {
            if(this.consumer != null)
            {
               try{this.consumer.close();}
               catch(final JMSException e){e.printStackTrace();}
            }

            if(this.session != null)
            {
               try{this.session.close();}
               catch(final JMSException e){e.printStackTrace();}
            }

            if(this.connection != null)
            {
               try{this.connection.close();}
               catch(final JMSException e) {e.printStackTrace();}
            }
         }
      }
      catch(final JMSException e2){e2.printStackTrace();}
   }
}

--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Messages-not-being-removed-from-networked-queue-tp3936864p3936864.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to