Hello everybody!

I'm having trouble getting a clustered topic to work.
I worked throw the two examples "JMS Clustered Topic Example" and "JMS Load Balanced Clustered Queue Example" but I cannot get this to work. I actually don't want to use JMS but CORE protocol. The setup is a 6 node ha cluster (3 live, 3 backup) on two machines. I know that there is no such concept like a topic in Artemis. So I create a multicast address with a queue per consumer on each of the three nodes and only one producer on one of the three nodes. What I would like to achieve is that all consumers get a copy of the messages the producer posts to the address. But only the one connected to the same cluster node is doing so. What am I missing? Below is the configuration of the first broker node. The others have identical settings besides the IP address. I have three ha replication groups to pair every live node with exactly one backup node.

<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
               xmlns:xi="http://www.w3.org/2001/XInclude";
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="urn:activemq:core ">

      <name>ArtemisServer1</name>
      <persistence-enabled>true</persistence-enabled>
<max-redelivery-records>1</max-redelivery-records>
      <journal-type>ASYNCIO</journal-type>
      <paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>

      <journal-datasync>true</journal-datasync>
      <journal-min-files>2</journal-min-files>
      <journal-pool-files>10</journal-pool-files>
<journal-device-block-size>4096</journal-device-block-size>
      <journal-file-size>10M</journal-file-size>

<journal-buffer-timeout>44000</journal-buffer-timeout>
      <journal-max-io>4096</journal-max-io>

      <connectors>
        <!-- Connector used to be announced through cluster connections and notifications -->
        <connector name="artemis">tcp://192.168.33.240:61616</connector>
      </connectors>

      <disk-scan-period>5000</disk-scan-period>
      <max-disk-usage>90</max-disk-usage>

      <critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>LOG</critical-analyzer-policy>

      <page-sync-timeout>5084000</page-sync-timeout>
      <global-max-messages>-1</global-max-messages>

      <acceptors>
         <acceptor name="artemis">tcp://192.168.33.240:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE;useEpoll=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
      </acceptors>


      <cluster-user>cluster-user</cluster-user>
<cluster-password>cluster-user-password</cluster-password>

      <broadcast-groups>
         <broadcast-group name="bg-group1">
            <group-address>231.7.7.7</group-address>
            <group-port>9876</group-port>
            <broadcast-period>5000</broadcast-period>
            <connector-ref>artemis</connector-ref>
         </broadcast-group>
      </broadcast-groups>

      <discovery-groups>
         <discovery-group name="dg-group1">
            <group-address>231.7.7.7</group-address>
            <group-port>9876</group-port>
            <refresh-timeout>10000</refresh-timeout>
         </discovery-group>
      </discovery-groups>

      <ha-policy>
        <replication>
          <master>
<check-for-live-server>true</check-for-live-server>
            <group-name>group 1</group-name>
          </master>
        </replication>
      </ha-policy>

      <cluster-connections>
         <cluster-connection name="my-cluster">
            <address>eu.inform</address>
            <connector-ref>artemis</connector-ref>
<message-load-balancing>STRICT</message-load-balancing>
            <max-hops>1</max-hops>
            <discovery-group-ref discovery-group-name="dg-group1"/>
         </cluster-connection>
      </cluster-connections>


      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
<max-delivery-attempts>-1</max-delivery-attempts>
<redistribution-delay>1500</redistribution-delay>

<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-addresses>true</auto-delete-addresses>

            <!-- The size of each page file -->
            <page-size-bytes>524288</page-size-bytes>

            <!-- When we start applying the address-full-policy, e.g paging -->             <!-- Both are disabled by default, which means we will use the global-max-size/global-max-messages  -->
            <max-size-bytes>1048576</max-size-bytes>
            <max-size-messages>-1</max-size-messages>

            <!-- When we read from paging into queues (memory) -->

<max-read-page-messages>-1</max-read-page-messages>
<max-read-page-bytes>20M</max-read-page-bytes>

            <!-- Limit on paging capacity before starting to throw errors -->

            <page-limit-bytes>-1</page-limit-bytes>
<page-limit-messages>-1</page-limit-messages>
          </address-setting>
      </address-settings>

      <addresses>
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>

      </addresses>
   </core>
</configuration>

On the client side I create the addresses and queues programmatically like this:

try (ClientSessionFactory factory = getClientSessionFactory(p_args);
     ClientSession session = createSession())
{
  final String queueName = ADDRESS + "." + clientId;

  // create subscription
  createQueue(session, ADDRESS, queueName, null);

  // consume messages
  try (ClientConsumer consumer = session.createConsumer(queueName))
  {
    while (!stop)
    {
      final ClientMessage msg = consumer.receive();

      LOGGER.info("Received message {} from {}", msg.getLongProperty("myCounter"), msg.getStringProperty("producedBy"));

      msg.acknowledge();
    }
  }
}
catch (final Exception e)
{
  LOGGER.error(e, "Error playing consumer");
}


private void createQueue(final ClientSession p_session, final String p_address, final String p_queueName, final String p_filter) throws Exception
{
  final AddressQuery addressQuery = p_session.addressQuery(SimpleString.toSimpleString(p_address));   final QueueQuery queueQuery = p_session.queueQuery(SimpleString.toSimpleString(p_queueName));

  if (!addressQuery.isExists())
  {
    LOGGER.info("Creating new address {}", p_address);

p_session.createAddress(SimpleString.toSimpleString(p_address), RoutingType.MULTICAST, false);
  }

  if (!queueQuery.isExists())
  {
    LOGGER.info("Creating new queue. Address={}, Name={}, Filter={}", p_address, p_queueName, p_filter);

    final QueueConfiguration conf = new QueueConfiguration(p_queueName)
                    .setAddress(p_address)
                    .setDurable(true)
                    .setTemporary(false)
                    .setFilterString(p_filter);

    p_session.createQueue(conf);
  }
}

Do I have to setup a bridge to get this running and if so how would I do this?

Kind regards

Sebastian Götz

Reply via email to