Hi There,
I keep getting this:
2008-01-08 13:16:02,466 [main] FATAL ActiveMQTest ActiveMQTest.java (98)
- Data source rejected establishment of connection, message from
server: "Too many connections"
It happens to both JMS and Stomp clients where the persistent header is set.
Notes before long listings:
* It only happens to MySQL
* It appears to work with Postgresql
* It also works with the default storage system [whatever that is]
The most reliable way for me to reproduce this error is to configure
ActiveMQ to use MySQL via JDBC (or a journalled JDBC), run the first
Java listing once with 98 messages and then run it again. I'll get an
exception when I try to run it again.
Postgres on the other hand, whilst it wasn't particularly "fast", kept
on processing messages; I manually kept re-running 100 or more messages
until I stacked 50 000 messages in.
Any ideas would be appreciated as we'd prefer to keep our MySQL
databases for when we want a persistent store (standard operating
environment reasons).
DSL
===
PROGRAM LISTINGS AND MACHINE SPECS AS FOLLOWS (Long)
I have:
SunOS isengaard 5.10 Generic_120012-14 i86pc i386 i86pc
java full version "1.5.0_12-b04"
MySQL server version: 5.0.45
The machine I'm running on is an Intel Quad Core E6600 with 4Gb of
memory and modern hard drive.
...which is logged as:
WARN JDBCPersistenceAdapter - Old message cleanup failed due
to: java.io.IOException: Data source rejected establishment of
connection, message from server: "Too many connections"
java.io.IOException: Data source rejected establishment of connection,
message from server: "Too many connections"
at
org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:45)
at
org.apache.activemq.store.jdbc.TransactionContext.getConnection(TransactionContext.java:61)
at
org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doDeleteOldMessages(DefaultJDBCAdapter.java:570)
at
org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.cleanup(JDBCPersistenceAdapter.java:213)
at
org.apache.activemq.store.jdbc.JDBCPersistenceAdapter$1.run(JDBCPersistenceAdapter.java:187)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:417)
at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:280)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:135)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:65)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:142)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:166)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
at java.lang.Thread.run(Thread.java:595)
My configuration is as follows:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.org/config/1.0
http://activemq.apache.org/schema/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
<!-- Allows us to use system properties as variables in this
configuration file -->
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker xmlns="http://activemq.org/config/1.0" brokerName="localhost"
dataDirectory="${activemq.base}/data">
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"
discoveryUri="multicast://default"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector name="xmpp" uri="xmpp://localhost:61222"/>
</transportConnectors>
<networkConnectors>
<networkConnector name="default-nc" uri="multicast://default"/>
</networkConnectors>
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
<managementContext>
<managementContext connectorPort="1099"
jmxDomainName="org.apache.activemq"/>
</managementContext>
</broker>
<!-- lets create a command agent to respond to message based admin
commands on the ActiveMQ.Agent topic -->
<commandAgent xmlns="http://activemq.org/config/1.0"/>
<!-- An embedded servlet engine for serving up the Admin console -->
<jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
<connectors>
<nioConnector port="8161" />
</connectors>
<handlers>
<webAppContext contextPath="/admin"
resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true" />
</handlers>
</jetty>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url"
value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="maxActive" value="5"/>
<property name="password" value="passw0rd"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
<!-- END SNIPPET: example -->
I've tried fiddling with maxActive (from setting it really high, to
setting it to -1 which means "as many as you want" according to "DBCP"
docs).
I don't seem to be able to diagnose what is actually wrong or why it's
happening. The things I have noticed are:
1. This program will make it fail after about 50 messages:
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package au.com.adam.activemqtest;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
/**
*
* @author lloy0076
*
* If you set N - 3 messages, you should get N messages in the
* queue. Set N by using the -Dmax=N on the Java commandline.
*/
public class ActiveMQTest {
Logger logger = Logger.getLogger(ActiveMQTest.class);
private int max = 0;
private int count = 0;
private String queue_name = "";
private final int DEFAULT_MAX = 100;
private final String QUEUE_NAME = "activemq";
public static void main(String[] args) {
ActiveMQTest instance = new ActiveMQTest();
instance.logger.debug("Starting test...");
instance.getConfiguration();
instance.sendMessages();
}
private void getConfiguration() {
int local_max = DEFAULT_MAX;
if (System.getProperty("max") != null) {
local_max =
Integer.valueOf(System.getProperty("max")).intValue();
}
logger.debug("Setting max to " + local_max);
setMax(local_max);
String local_queue_name = QUEUE_NAME;
if (System.getProperty("queue_name") != null) {
local_queue_name = System.getProperty("queue_name");
}
logger.debug("Setting queue name to " + local_queue_name);
setQueue_name(local_queue_name);
}
private void sendMessages() {
Connection conn = null;
try {
long start_time = System.currentTimeMillis();
logger.debug("Start time: " + start_time);
String user = ActiveMQConnection.DEFAULT_USER;
String password = ActiveMQConnection.DEFAULT_PASSWORD;
String broker_url = ActiveMQConnection.DEFAULT_BROKER_URL;
logger.info("Connecting to: " + broker_url + ".");
logger.info("Connecting as: " + user);
logger.info("Password is: " + password);
logger.info("Sending " + getMax() + " messages.");
ActiveMQConnectionFactory connection_factory = new
ActiveMQConnectionFactory("", "", broker_url);
conn = connection_factory.createConnection();
conn.start();
Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(getQueue_name());
MessageProducer producer = session.createProducer(destination);
TextMessage expectation_message =
session.createTextMessage("reset:::" + getMax());
producer.send(expectation_message);
for (int i = 0; i < getMax(); i++) {
TextMessage message =
session.createTextMessage(Integer.toString(i));
producer.send(message);
}
TextMessage check_message =
session.createTextMessage("check:::");
producer.send(check_message);
long end_time = System.currentTimeMillis();
long duration = end_time - start_time;
logger.debug("End time: " + end_time);
logger.debug("Duaration: " + duration);
} catch (JMSException ex) {
logger.fatal(ex.getMessage());
} finally {
if (conn != null) {
try {
logger.debug("Closing connection...");
conn.close();
} catch (Throwable ex) {
logger.fatal(ex.getMessage());
}
}
}
}
public int getMax() {
return max;
}
public void setMax(int max) {
this.max = max;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public String getQueue_name() {
return queue_name;
}
public void setQueue_name(String queue_name) {
this.queue_name = queue_name;
}
}
2. Stomp (from Net::Perl or Gozirra's Stomp) don't seem to trigger the error
#!/usr/bin/perl
use strict;
use warnings;
use Carp;
use Data::Dumper;
use Net::Stomp;
# subscribe to messages from the queue 'foo'
use Net::Stomp;
my $stomp = Net::Stomp->new( { hostname => 'localhost', port => '61613' } );
$stomp->connect( { login => 'hello', passcode => 'there' } );
# Two less to cope with the messages *outside* of the while loop; 1000
# messages should be in the queue when this finishes processing.
my $max = 998;
my $count = 0;
$stomp->send( { destination => '/queue/foo', body => "reset:::$max", });
while ($count < $max) {
$stomp->send( { destination => '/queue/foo', body =>
'count::$count', persistent => 'true', } );
print "x" if ($count %500 == 0);
$count++;
print "\n" if ($count % 5000 == 0);
}
$stomp->send( { destination => '/queue/foo', body => 'check:::$count' } );
NOTE:
Non-persistent stomp messages go through fine:
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package au.com.adam.gozirrastomp;
import java.io.IOException;
import java.util.logging.Level;
import javax.security.auth.login.LoginException;
import net.ser1.stomp.Client;
import org.apache.log4j.Logger;
/**
*
* @author lloy0076
*/
public class GozirraMain {
Logger logger = Logger.getLogger(GozirraMain.class);
private int max = 0;
private int count = 0;
private String queue_name = "";
private final int DEFAULT_MAX = 100;
private final String QUEUE_NAME = "/queue/foo";
/**
* @param args the command line arguments
*/
public static void main(String[] args) {
GozirraMain instance = new GozirraMain();
instance.getConfiguration();
instance.sendMessages();
System.exit(0);
}
private void getConfiguration() {
int local_max = DEFAULT_MAX;
if (System.getProperty("max") != null) {
local_max =
Integer.valueOf(System.getProperty("max")).intValue();
}
logger.debug("Setting max to " + local_max);
setMax(local_max);
String local_queue_name = QUEUE_NAME;
if (System.getProperty("queue_name") != null) {
local_queue_name = System.getProperty("queue_name");
}
logger.debug("Setting queue name to " + local_queue_name);
setQueue_name(local_queue_name);
}
private void sendMessages() {
try {
long start_time = System.currentTimeMillis();
logger.debug("Start time: " + start_time);
String user = null;
String password = null;
String broker_url = "localhost";
int port = 61613;
logger.info("Connecting to: " + broker_url + ".");
logger.info("Port: " + port);
logger.info("Connecting as: " + user);
logger.info("Password is: " + password);
logger.info("Sending " + getMax() + " messages.");
Client c = new Client(broker_url, port, "", "");
c.send(getQueue_name(), "reset:::" + getMax());
int i = 0;
for (i = 0; i <= getMax(); i++) {
c.send(getQueue_name(), "count:::" + i);
}
c.send(getQueue_name(), "check");
logger.info("Sent " + i + " messages.");
long end_time = System.currentTimeMillis();
long duration = end_time - start_time;
logger.debug("End time: " + end_time);
logger.debug("Duaration: " + duration);
} catch (IOException ex) {
java.util.logging.Logger.getLogger(GozirraMain.class.getName()).log(Level.SEVERE,
null, ex);
} catch (LoginException ex) {
java.util.logging.Logger.getLogger(GozirraMain.class.getName()).log(Level.SEVERE,
null, ex);
}
}
public int getMax() {
return max;
}
public void setMax(int max) {
this.max = max;
}
public int getCount() {
return count;
}
public void setCount(int count) {
this.count = count;
}
public String getQueue_name() {
return queue_name;
}
public void setQueue_name(String queue_name) {
this.queue_name = queue_name;
}
}