Now, it seems like the recommendation is for Master/Slave for reliable 
delivery, but I don't think we can do that.

We've got field laptops which may or may not have connectivity with the central 
office, but they want to be able to run the application and have it send 
messages that get delivered when the network comes up.

And the client wants the central office to be able to send assignments to the 
field whether or not the network to the field laptop is up.

So for the field laptops I've set up an embedded broker which then connects to 
a central office broker.   The reason is that the application won't come up if 
there isn't a local broker that it can connect to.  If the application is 
started w/o a local broker or a connection to the central office it doesn't 
come 
up.   Plus, I wanted the local message persistence so that the user of the 
application can trigger messages to be sent even when the connection to the 
central office isn't present.

In the central office we've get server processes also with embedded brokers.

I've been fiddling with the setup configuration for the embedded brokers and 
the 
central office brokers and I've gotten to the point that messages from the 
field 
get sent to the central office on reconnect, but don't get sent from the 
central 
office to the field on reconnect.

And everything works if everything is connected.   But if the laptop isn't 
connected to the central office when the central office sends out assignments 
to 
the field, then the laptop will never see those messages.

I've tried using durable subscription topics, I've tried setting 
"conduitSubscriptions" to both true and false - and I'm not sure if I should 
set it one way for the central office broker and the same or different for the 
laptop and server process embedded brokers.

The network kind of looks like this:

Field laptop|---------|central office broker|-----|Server Process|

The central office broker is intended to be the "master", in as much as there 
is 
a master - this isn't really a master/slave setup.

I can use JMX to look at the enqueue/dequeue counts to see where messages are 
getting hung up - but I am not ever sure why.

I had thought that with durable subscriptions, the messages would get sent, 
but that *seems* not to be the case.

I have looked through ActiveMQ bug reports and the online documentation.  Of 
which there is a lot, but I am still quite confused.

Seeing as how I'm doing the embedded broker set up in Groovy, rather than in 
spring or xbean - should I explicitly be using a DemandForwardingBridge as in 
the ThreeBroker tests?   Or does the underlying code just use it anyway?

What would that imply for the activemq.xml config file for the master broker?   
Is there configuration that explicitly says use a DemandForwardingBridge?  Or 
is the DemandForwardingBridge not the issue?

What should these values be set to?

                        decreaseNetworkConsumerPriority="false"
                        conduitSubscriptions="true"
                        bridgeTempDestinations="true"
                        duplex="true"

Sorry for such a newbie set of questions and so long an email.
/*
 * $$
 *
 ***************************************************************************
 *
 * Copyright (c) 2006 Coned.  All rights reserved.
 *
 * Company:      http://www.coned.com
 *
 ***************************************************************************
 */
package com.coned.util.db;


import java.net.URI;
import javax.sql.DataSource

import org.apache.activemq.broker.BrokerService
import org.apache.activemq.network.NetworkConnector
import org.apache.activemq.network.DiscoveryNetworkConnector
import org.apache.activemq.broker.jmx.ManagementContext
import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory
import org.apache.activemq.broker.view.ConnectionDotFilePlugin
import org.apache.activemq.broker.view.DestinationDotFilePlugin

import org.apache.derby.jdbc.EmbeddedDataSource

import org.apache.log4j.Logger;

import com.samsix.util.LoggerControl
import com.samsix.util.HostNameProvider



/**
 *    Utility to create the jms broker.
 */
public class JmsBroker
{
    private static final Logger logger = Logger.getLogger( JmsBroker );


    String          applicationNickname;
    String          transportConnectorUri;
    String          networkConnectorUri;
    DataSource      dataSource


    public void createBroker()
    {
        turnOnActiveMqLogging()

        BrokerService broker = new BrokerService();

        // configure the broker
        broker.brokerName       = getBrokerName()
        broker.useJmx           = true;
        broker.dataDirectory    = getDataDirectory()

        //
        //      Suggested via email.  Also see:
        //
        //      http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkLoadTest.java?view=markup
        //
        broker.getManagementContext().setCreateConnector( false );

//logger.error "data directory: ${getDataDirectory()}"

        addPlugins( broker )
        addNetworkConnector( broker )
        addManagementContext( broker )
        addPersistenceAdaptor( broker )

        broker.addConnector( transportConnectorUri );
        broker.start();
    }


    def addPersistenceAdaptor( broker )
    {
        EmbeddedDataSource                  dataSource  = new EmbeddedDataSource()
        JournalPersistenceAdapterFactory    factory     = new JournalPersistenceAdapterFactory()

        dataSource.databaseName     = getDataDirectory() + File.separator + "derbydb"
        dataSource.createDatabase   = "create"
        factory.journalLogFiles     = 5
        factory.dataDirectory       = getDataDirectory()
        factory.dataSource          = dataSource

        broker.persistenceFactory   = factory
    }


    def addManagementContext( broker )
    {
        ManagementContext context = new ManagementContext()
        context.useMBeanServer  = true
        context.createConnector = true
        context.connectorPort   = getJmxPort()
        context.jmxDomainName   = "org.apache.activemq"

        broker.managementContext = context
    }


    def addNetworkConnector( broker )
    {
//        NetworkConnector connector = broker.addNetworkConnector( networkConnectorUri );

        NetworkConnector connector = new DiscoveryNetworkConnector( new URI( networkConnectorUri ) )
        connector.name                              = "masterJms"
        connector.duplex                            = true
        connector.dynamicOnly                       = false

        //
        //      just read something from james strachan...
        //      "For demand forwarding I think you need to set <networkConnector conduitSubscriptions="false" ..."
        //
        //      https://issues.apache.org/activemq/browse/AMQ-632;jsessionid=2E616BDE6A9E07FF60967D5A69352D1F?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
        //
        connector.conduitSubscriptions              = true


        connector.bridgeTempDestinations            = true
        connector.decreaseNetworkConsumerPriority   = false

        //
        //    networkTTL (defaults to 1)
        //
        //      the number of brokers in the network that messages
        //      and subscriptions can pass through
        //
        //
        connector.networkTTL = 100

        broker.addNetworkConnector( connector )
    }


    def addPlugins( broker )
    {
        def plugins = []
        def connectionDotPlugin     = new ConnectionDotFilePlugin()
        def destinationDotPlugin    = new DestinationDotFilePlugin()

        connectionDotPlugin.file    = "${getDataDirectory()}${File.separator}${getBrokerName()}-Connection.dot"
        destinationDotPlugin.file   = "${getDataDirectory()}${File.separator}${getBrokerName()}-Destination.dot"

//        plugins << connectionDotPlugin
        plugins << destinationDotPlugin

        broker.plugins = plugins
    }


    String getDataDirectory()
    {
        "${System.properties.'java.io.tmpdir'}${File.separator}activemq${File.separator}${applicationNickname}"
    }


    def turnOnActiveMqLogging()
    {
        def loggerControl = new LoggerControl()
        loggerControl.setDebug( "org.apache.activemq" )
        loggerControl.afterPropertiesSet()
    }


    int getJmxPort()
    {
        def tmp = transportConnectorUri.substring( transportConnectorUri.lastIndexOf( ":" ) + 1 )
// println "tmp - transport port#: [${tmp}]"

        Integer.parseInt( tmp.trim() ) + 100
    }


    String getBrokerName()
    {
        "${new HostNameProvider().get()}-${applicationNickname}"
    }


    String getClientId( String  topicName )
    {
        "${new HostNameProvider().get()}-${applicationNickname}-${topicName}"
    }


    String getBrokerUri()
    {
        "vm://${getBrokerName()}"
    }
}

Attachment: activemq.xml
Description: XML document

Reply via email to