Hi There,


I keep getting this:

2008-01-08 13:16:02,466 [main] FATAL ActiveMQTest ActiveMQTest.java (98) - Data source rejected establishment of connection, message from server: "Too many connections"


It happens to both JMS and Stomp clients where the persistent header is set.

Notes before long listings:

* It only happens to MySQL
* It appears to work with Postgresql
* It also works with the default storage system [whatever that is]

The most reliable way for me to reproduce this error is to configure ActiveMQ to use MySQL via JDBC (or a journalled JDBC), run the first Java listing once with 98 messages and then run it again. I'll get an exception when I try to run it again.

Postgres on the other hand, whilst it wasn't particularly "fast", kept on processing messages; I manually kept re-running 100 or more messages until I stacked 50 000 messages in.

Any ideas would be appreciated as we'd prefer to keep our MySQL databases for when we want a persistent store (standard operating environment reasons).

DSL

===
PROGRAM LISTINGS AND MACHINE SPECS AS FOLLOWS (Long)

I have:

  SunOS isengaard 5.10 Generic_120012-14 i86pc i386 i86pc
  java full version "1.5.0_12-b04"
  MySQL server version: 5.0.45

The machine I'm running on is an Intel Quad Core E6600 with 4Gb of memory and modern hard drive.

...which is logged as:

WARN JDBCPersistenceAdapter - Old message cleanup failed due to: java.io.IOException: Data source rejected establishment of connection, message from server: "Too many connections" java.io.IOException: Data source rejected establishment of connection, message from server: "Too many connections" at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:45) at org.apache.activemq.store.jdbc.TransactionContext.getConnection(TransactionContext.java:61) at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doDeleteOldMessages(DefaultJDBCAdapter.java:570) at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.cleanup(JDBCPersistenceAdapter.java:213) at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter$1.run(JDBCPersistenceAdapter.java:187) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:417) at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:280)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:135)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:65) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:142) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:166) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
        at java.lang.Thread.run(Thread.java:595)

My configuration is as follows:



<beans
  xmlns="http://www.springframework.org/schema/beans";
  xmlns:amq="http://activemq.org/config/1.0";
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.org/config/1.0 http://activemq.apache.org/schema/activemq-core.xsd http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd";>

<!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

<broker xmlns="http://activemq.org/config/1.0"; brokerName="localhost" dataDirectory="${activemq.base}/data">
    <transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
       <transportConnector name="stomp"   uri="stomp://localhost:61613"/>
       <transportConnector name="xmpp"    uri="xmpp://localhost:61222"/>
    </transportConnectors>

    <networkConnectors>
      <networkConnector name="default-nc" uri="multicast://default"/>
    </networkConnectors>


    <persistenceAdapter>
        <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
    </persistenceAdapter>

    <managementContext>
<managementContext connectorPort="1099" jmxDomainName="org.apache.activemq"/>
    </managementContext>

  </broker>

<!-- lets create a command agent to respond to message based admin commands on the ActiveMQ.Agent topic -->
  <commandAgent xmlns="http://activemq.org/config/1.0"/>


  <!-- An embedded servlet engine for serving up the Admin console -->
  <jetty xmlns="http://mortbay.com/schemas/jetty/1.0";>
    <connectors>
      <nioConnector port="8161" />
    </connectors>

    <handlers>
<webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" />
    </handlers>
  </jetty>

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="maxActive" value="5"/>
    <property name="password" value="passw0rd"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>

</beans>
<!-- END SNIPPET: example -->

I've tried fiddling with maxActive (from setting it really high, to setting it to -1 which means "as many as you want" according to "DBCP" docs).

I don't seem to be able to diagnose what is actually wrong or why it's happening. The things I have noticed are:

1. This program will make it fail after about 50 messages:

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package au.com.adam.activemqtest;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

/**
 *
 * @author lloy0076
 *
 * If you set N - 3 messages, you should get N messages in the
 * queue. Set N by using the -Dmax=N on the Java commandline.
 */
public class ActiveMQTest {

    Logger logger = Logger.getLogger(ActiveMQTest.class);
    private int max = 0;
    private int count = 0;
    private String queue_name = "";
    private final int DEFAULT_MAX = 100;
    private final String QUEUE_NAME = "activemq";

    public static void main(String[] args) {
        ActiveMQTest instance = new ActiveMQTest();

        instance.logger.debug("Starting test...");
        instance.getConfiguration();
        instance.sendMessages();
    }

    private void getConfiguration() {
        int local_max = DEFAULT_MAX;

        if (System.getProperty("max") != null) {
local_max = Integer.valueOf(System.getProperty("max")).intValue();
        }

        logger.debug("Setting max to " + local_max);
        setMax(local_max);

        String local_queue_name = QUEUE_NAME;

        if (System.getProperty("queue_name") != null) {
            local_queue_name = System.getProperty("queue_name");
        }

        logger.debug("Setting queue name to " + local_queue_name);
        setQueue_name(local_queue_name);
    }

    private void sendMessages() {
        Connection conn = null;
        try {
            long start_time = System.currentTimeMillis();
            logger.debug("Start time: " + start_time);

            String user = ActiveMQConnection.DEFAULT_USER;
            String password = ActiveMQConnection.DEFAULT_PASSWORD;
            String broker_url = ActiveMQConnection.DEFAULT_BROKER_URL;

            logger.info("Connecting to: " + broker_url + ".");
            logger.info("Connecting as: " + user);
            logger.info("Password is: " + password);

            logger.info("Sending " + getMax() + " messages.");

ActiveMQConnectionFactory connection_factory = new ActiveMQConnectionFactory("", "", broker_url);
            conn = connection_factory.createConnection();
            conn.start();

Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(getQueue_name());
            MessageProducer producer = session.createProducer(destination);

TextMessage expectation_message = session.createTextMessage("reset:::" + getMax());
            producer.send(expectation_message);

            for (int i = 0; i < getMax(); i++) {
TextMessage message = session.createTextMessage(Integer.toString(i));
                producer.send(message);
            }

TextMessage check_message = session.createTextMessage("check:::");
            producer.send(check_message);

            long end_time = System.currentTimeMillis();
            long duration = end_time - start_time;
            logger.debug("End time: " + end_time);
            logger.debug("Duaration: " + duration);
        } catch (JMSException ex) {
            logger.fatal(ex.getMessage());
        } finally {
            if (conn != null) {
                try {
                    logger.debug("Closing connection...");
                    conn.close();
                } catch (Throwable ex) {
                    logger.fatal(ex.getMessage());
                }
            }
        }
    }

    public int getMax() {
        return max;
    }

    public void setMax(int max) {
        this.max = max;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public String getQueue_name() {
        return queue_name;
    }

    public void setQueue_name(String queue_name) {
        this.queue_name = queue_name;
    }
}

2. Stomp (from Net::Perl or Gozirra's Stomp) don't seem to trigger the error

#!/usr/bin/perl

use strict;
use warnings;

use Carp;
use Data::Dumper;

use Net::Stomp;

# subscribe to messages from the queue 'foo'
use Net::Stomp;
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect( { login => 'hello', passcode => 'there' } );

# Two less to cope with the messages *outside* of the while loop; 1000
# messages should be in the queue when this finishes processing.
my $max = 998;
my $count = 0;

$stomp->send( { destination => '/queue/foo', body => "reset:::$max", });

while ($count < $max) {
$stomp->send( { destination => '/queue/foo', body => 'count::$count', persistent => 'true', } );
    print "x" if ($count %500 == 0);
    $count++;
    print "\n" if ($count % 5000 == 0);
}

$stomp->send( { destination => '/queue/foo', body => 'check:::$count' } );


NOTE:

Non-persistent stomp messages go through fine:

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package au.com.adam.gozirrastomp;

import java.io.IOException;
import java.util.logging.Level;
import javax.security.auth.login.LoginException;
import net.ser1.stomp.Client;
import org.apache.log4j.Logger;

/**
 *
 * @author lloy0076
 */
public class GozirraMain {

    Logger logger = Logger.getLogger(GozirraMain.class);
    private int max = 0;
    private int count = 0;
    private String queue_name = "";
    private final int DEFAULT_MAX = 100;
    private final String QUEUE_NAME = "/queue/foo";

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        GozirraMain instance = new GozirraMain();

        instance.getConfiguration();
        instance.sendMessages();

        System.exit(0);
    }

    private void getConfiguration() {
        int local_max = DEFAULT_MAX;

        if (System.getProperty("max") != null) {
local_max = Integer.valueOf(System.getProperty("max")).intValue();
        }

        logger.debug("Setting max to " + local_max);
        setMax(local_max);

        String local_queue_name = QUEUE_NAME;

        if (System.getProperty("queue_name") != null) {
            local_queue_name = System.getProperty("queue_name");
        }

        logger.debug("Setting queue name to " + local_queue_name);
        setQueue_name(local_queue_name);
    }

    private void sendMessages() {
        try {
            long start_time = System.currentTimeMillis();
            logger.debug("Start time: " + start_time);

            String user = null;
            String password = null;
            String broker_url = "localhost";
            int port = 61613;

            logger.info("Connecting to: " + broker_url + ".");
            logger.info("Port: " + port);
            logger.info("Connecting as: " + user);
            logger.info("Password is: " + password);

            logger.info("Sending " + getMax() + " messages.");

            Client c = new Client(broker_url, port, "", "");

            c.send(getQueue_name(), "reset:::" + getMax());

            int i = 0;
            for (i = 0; i <= getMax(); i++) {
                c.send(getQueue_name(), "count:::" + i);
            }

            c.send(getQueue_name(), "check");

            logger.info("Sent " + i + " messages.");

            long end_time = System.currentTimeMillis();
            long duration = end_time - start_time;
            logger.debug("End time: " + end_time);
            logger.debug("Duaration: " + duration);
        } catch (IOException ex) {

java.util.logging.Logger.getLogger(GozirraMain.class.getName()).log(Level.SEVERE, null, ex);
        } catch (LoginException ex) {

java.util.logging.Logger.getLogger(GozirraMain.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public int getMax() {
        return max;
    }

    public void setMax(int max) {
        this.max = max;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public String getQueue_name() {
        return queue_name;
    }

    public void setQueue_name(String queue_name) {
        this.queue_name = queue_name;
    }
}

Reply via email to