Now, it seems like the recommendation is for Master/Slave for reliable delivery, but I don't think we can do that.
We've got field laptops which may or may not have connectivity with the central office, but they want to be able to run the application and have it send messages that get delivered when the network comes up. And the client wants the central office to be able to send assignments to the field whether or not the network to the field laptop is up. So for the field laptops I've set up an embedded broker which then connects to a central office broker. The reason is that the application won't come up if there isn't a local broker that it can connect to. If the application is started w/o a local broker or a connection to the central office it doesn't come up. Plus, I wanted the local message persistence so that the user of the application can trigger messages to be sent even when the connection to the central office isn't present. In the central office we've get server processes also with embedded brokers. I've been fiddling with the setup configuration for the embedded brokers and the central office brokers and I've gotten to the point that messages from the field get sent to the central office on reconnect, but don't get sent from the central office to the field on reconnect. And everything works if everything is connected. But if the laptop isn't connected to the central office when the central office sends out assignments to the field, then the laptop will never see those messages. I've tried using durable subscription topics, I've tried setting "conduitSubscriptions" to both true and false - and I'm not sure if I should set it one way for the central office broker and the same or different for the laptop and server process embedded brokers. The network kind of looks like this: Field laptop|---------|central office broker|-----|Server Process| The central office broker is intended to be the "master", in as much as there is a master - this isn't really a master/slave setup. I can use JMX to look at the enqueue/dequeue counts to see where messages are getting hung up - but I am not ever sure why. I had thought that with durable subscriptions, the messages would get sent, but that *seems* not to be the case. I have looked through ActiveMQ bug reports and the online documentation. Of which there is a lot, but I am still quite confused. Seeing as how I'm doing the embedded broker set up in Groovy, rather than in spring or xbean - should I explicitly be using a DemandForwardingBridge as in the ThreeBroker tests? Or does the underlying code just use it anyway? What would that imply for the activemq.xml config file for the master broker? Is there configuration that explicitly says use a DemandForwardingBridge? Or is the DemandForwardingBridge not the issue? What should these values be set to? decreaseNetworkConsumerPriority="false" conduitSubscriptions="true" bridgeTempDestinations="true" duplex="true" Sorry for such a newbie set of questions and so long an email.
/* * $$ * *************************************************************************** * * Copyright (c) 2006 Coned. All rights reserved. * * Company: http://www.coned.com * *************************************************************************** */ package com.coned.util.db; import java.net.URI; import javax.sql.DataSource import org.apache.activemq.broker.BrokerService import org.apache.activemq.network.NetworkConnector import org.apache.activemq.network.DiscoveryNetworkConnector import org.apache.activemq.broker.jmx.ManagementContext import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory import org.apache.activemq.broker.view.ConnectionDotFilePlugin import org.apache.activemq.broker.view.DestinationDotFilePlugin import org.apache.derby.jdbc.EmbeddedDataSource import org.apache.log4j.Logger; import com.samsix.util.LoggerControl import com.samsix.util.HostNameProvider /** * Utility to create the jms broker. */ public class JmsBroker { private static final Logger logger = Logger.getLogger( JmsBroker ); String applicationNickname; String transportConnectorUri; String networkConnectorUri; DataSource dataSource public void createBroker() { turnOnActiveMqLogging() BrokerService broker = new BrokerService(); // configure the broker broker.brokerName = getBrokerName() broker.useJmx = true; broker.dataDirectory = getDataDirectory() // // Suggested via email. Also see: // // http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkLoadTest.java?view=markup // broker.getManagementContext().setCreateConnector( false ); //logger.error "data directory: ${getDataDirectory()}" addPlugins( broker ) addNetworkConnector( broker ) addManagementContext( broker ) addPersistenceAdaptor( broker ) broker.addConnector( transportConnectorUri ); broker.start(); } def addPersistenceAdaptor( broker ) { EmbeddedDataSource dataSource = new EmbeddedDataSource() JournalPersistenceAdapterFactory factory = new JournalPersistenceAdapterFactory() dataSource.databaseName = getDataDirectory() + File.separator + "derbydb" dataSource.createDatabase = "create" factory.journalLogFiles = 5 factory.dataDirectory = getDataDirectory() factory.dataSource = dataSource broker.persistenceFactory = factory } def addManagementContext( broker ) { ManagementContext context = new ManagementContext() context.useMBeanServer = true context.createConnector = true context.connectorPort = getJmxPort() context.jmxDomainName = "org.apache.activemq" broker.managementContext = context } def addNetworkConnector( broker ) { // NetworkConnector connector = broker.addNetworkConnector( networkConnectorUri ); NetworkConnector connector = new DiscoveryNetworkConnector( new URI( networkConnectorUri ) ) connector.name = "masterJms" connector.duplex = true connector.dynamicOnly = false // // just read something from james strachan... // "For demand forwarding I think you need to set <networkConnector conduitSubscriptions="false" ..." // // https://issues.apache.org/activemq/browse/AMQ-632;jsessionid=2E616BDE6A9E07FF60967D5A69352D1F?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel // connector.conduitSubscriptions = true connector.bridgeTempDestinations = true connector.decreaseNetworkConsumerPriority = false // // networkTTL (defaults to 1) // // the number of brokers in the network that messages // and subscriptions can pass through // // connector.networkTTL = 100 broker.addNetworkConnector( connector ) } def addPlugins( broker ) { def plugins = [] def connectionDotPlugin = new ConnectionDotFilePlugin() def destinationDotPlugin = new DestinationDotFilePlugin() connectionDotPlugin.file = "${getDataDirectory()}${File.separator}${getBrokerName()}-Connection.dot" destinationDotPlugin.file = "${getDataDirectory()}${File.separator}${getBrokerName()}-Destination.dot" // plugins << connectionDotPlugin plugins << destinationDotPlugin broker.plugins = plugins } String getDataDirectory() { "${System.properties.'java.io.tmpdir'}${File.separator}activemq${File.separator}${applicationNickname}" } def turnOnActiveMqLogging() { def loggerControl = new LoggerControl() loggerControl.setDebug( "org.apache.activemq" ) loggerControl.afterPropertiesSet() } int getJmxPort() { def tmp = transportConnectorUri.substring( transportConnectorUri.lastIndexOf( ":" ) + 1 ) // println "tmp - transport port#: [${tmp}]" Integer.parseInt( tmp.trim() ) + 100 } String getBrokerName() { "${new HostNameProvider().get()}-${applicationNickname}" } String getClientId( String topicName ) { "${new HostNameProvider().get()}-${applicationNickname}-${topicName}" } String getBrokerUri() { "vm://${getBrokerName()}" } }
activemq.xml
Description: XML document