Hello,

I am using Active MQ 5.10 within a Karaf based application and have spent the last couple of days
investigating an OOM Exception we were getting occasionally.

It turned out, that the OOM happens occasionallly when we reconnect a durable subscriber that would have a large amount of data in it even though we have specified a rather small prefetch
size for durable subscribers (5) within the broker configuration.

Further analysis showed that after closing and reconnecting the same subscriber it has a prefetch size of 1000 (the default value). That would cause an OOM if those 1000 messages exceed the
JVM heap size.

Specifying the prefetch size within the broker url from the client side works fine, but we would like to keep the setiing on the broker side as not all clients are our development responsibility.

I have attached a test case that demonstrates this observation.

Is this a know issue or should I create a JIRA ticket for this ?


Thanks and best regards
Andreas



--


   Andreas Gies

WoQ -- Way of Quality GmbH

Geschäftsführer & CTO

/eMail:/andr...@wayofquality.de <mailto:andr...@wayofquality.de>

/Tel:/ +49 151 23470823

/Fax:/ +49 1805 006534 2114

/Twitter:/ andreasgies /Skype:/ giessonic

/LinkedIn:/ <http://de.linkedin.com/pub/andreas-gies/0/594/aa5/> (http://de.linkedin.com/pub/andreas-gies/0/594/aa5/)

/Xing:/ <http://www.xing.com/profile/Andreas_Gies> (http://www.xing.com/profile/Andreas_Gies)

/Blog:/ <http://www.wayofquality.de/index.php/en/blog> (http://www.wayofquality.de/index.php/en/blog)

/Github:/ <https://github.com/atooni> (https://github.com/atooni)

/Amtsgericht Landshut:/HRB 8352//

//

/Ust.-Id.:/ DE274771254


     Haftungsausschluss

Diese Email kann vertrauliche und/oder rechtlich geschützte Informationen enthalten und ist ausschließlich für den/die benannten Adressaten bestimmt. Sollten Sie nicht der beabsichtigte Empfänger sein oder diese Email irrtümlich erhalten haben, ist es Ihnen nicht gestattet diese Mail oder einen Teil davon ohne unsere Erlaubnis zu verbreiten, zu kopieren, unbefugt weiterzuleiten oder zu behalten. Informieren Sie bitte sofort den Absender telefonisch oder per Email und löschen Sie diese Email und alle Kopien aus Ihrem System. Wir haften nicht für die Unversehrtheit von Emails, nachdem sie unseren Einflussbereich verlassen haben.


     Disclaimer

This email may contain confidential and/or privileged information and is intended solely for the attention and use of the named addressee(s). If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are not authorized to and must not disclose, copy, distribute, or retain this message or any part of it without our authority. Please contact the sender by call or reply email immediately and destroy all copies and the original message. We are not responsible for the integrity of emails after they have left our sphere of control.

//
package de.woq.test.amq

import javax.jms.{Connection, Session}

import org.apache.activemq.broker.BrokerService
import org.apache.activemq.broker.region.policy.{PolicyEntry, PolicyMap}
import org.apache.activemq.store.memory.MemoryPersistenceAdapter
import org.apache.activemq.{ActiveMQConnectionFactory, ActiveMQTopicSubscriber}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
import org.slf4j.LoggerFactory

import scala.collection.convert.Wrappers._

class PrefetchSizeSpec extends WordSpec
  with BeforeAndAfterAll
  with Matchers {

  val LOGGER       = LoggerFactory.getLogger(classOf[PrefetchSizeSpec])
  val BROKERNAME   = "prefetchTest"
  val PREFETCHSIZE = 5

  lazy val brokerService = {
    val broker = new BrokerService

    broker.setBrokerName(BROKERNAME)
    // just for testing
    broker.setPersistent(false)
    broker.setPersistenceAdapter(new MemoryPersistenceAdapter)

    val policies = {
      val policyEntry = new PolicyEntry
      policyEntry.setTopic(">")
      policyEntry.setTopicPrefetch(PREFETCHSIZE)
      policyEntry.setDurableTopicPrefetch(PREFETCHSIZE)
      policyEntry.setQueuePrefetch(PREFETCHSIZE)
      policyEntry.setQueueBrowserPrefetch(PREFETCHSIZE)

      val result = new PolicyMap
      result.setPolicyEntries(SeqWrapper(Seq(policyEntry)))

      result
    }

    broker.setDestinationPolicy(policies)

    broker
  }

  val connFactory = new ActiveMQConnectionFactory(s"vm://${BROKERNAME}")

  override protected def beforeAll() {
    LOGGER.info("Starting Active MQ broker")
    brokerService.start()
    brokerService.waitUntilStarted()
    LOGGER.info("ActiveMQ broker started")
  }

  override protected def afterAll() {
    brokerService.stop()
    brokerService.waitUntilStopped()
    LOGGER.info("ActiveMQ broker stopped.")
  }

  def withSession(jmsConnection: Connection)(f: (Session => Unit)) {

    var session : Option[Session] = None
    try {
      session = Some(jmsConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE))
      f(session.get)
    } finally {
      session.foreach { s =>
        s.close() }
    }
  }

  private def connect(clientId: String) = {
    val conn = connFactory.createConnection()
    conn.setClientID(clientId)
    conn.start()

    Some(conn)
  }

  private def assertTopicPrefetch {

    connect("preftechTest").foreach { connection =>

      withSession(connection) { session =>
        LOGGER.info(s"Creating durable subscriber and checking prefetch 
size...")
        val topic = session.createTopic("foo")
        val subscriber =
          session.createDurableSubscriber(topic, 
"mySubscriber").asInstanceOf[ActiveMQTopicSubscriber]
        subscriber.getPrefetchNumber should be (PREFETCHSIZE)
      }

      connection.close()
    }

  }

  "A Durable subscription" should {

    "honor the broker defined prefetch size on reconnect" in {
      assertTopicPrefetch
      assertTopicPrefetch
    }
  }

}

Reply via email to