Hey,
I’ve been spinning my wheels on this and was hoping someone could help me
understand the mistakes I’m making. Here’s some background:
* Using AmazonMQ for ActiveMQ
* Publisher is publishing messages to my broker using MQTT protocol using
QoS 1 (written in java)
* Subscriber is subscribed to topics using MQTT protocol with wildcard, say
`some/topic/+/item`, with QoS 1 and clean session set to false (written in
python)
Let me walk you through my particular case:
1. Subscriber S subscribes to topic “some/topic/+/item” using MQTT protocol
* QoS 1 and clean session set to false
* Granted QoS == 1
2. Publisher P publishes message to “some/topic/1/item” using MQTT protocol
for the first time
* QoS 1
3. Subscriber S receives message on topic some/topic/1/item
4. Subscriber S disconnects
5. Publisher P publishes message to “some/topic/1/item” using MQTT protocol
for the second time
6. Publisher P publishes message to “some/topic/2/item” using MQTT protocol
for the first time
7. Subscriber S reconnects and subscribes to “some/topic/+/item”
8. Subscriber S receives message on topic some/topic/1/item
9. Subscriber S DOES NOT receive message on topic some/topic/2/item
This is not the behavior I would expect, and I need to figure out how to
resolve this before our customers start subscribing to our topics. I need to
receive all new messages that match the subscription with wildcards, even
topics that don’t exist prior the subscriber disconnecting.
In the UI, I’m seeing this after completing the case above:
NAME
# OF CONSUMERS
MSGS ENQUEUED
MSGS DEQUEUED
some.topic.1.item
1
2
2
some.topic.2.item
0
0
0
I’m fairly confident I’m just missing some configuration, but I can’t seem to
find what configuration that may be. Here’s my config:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<broker schedulePeriodForDestinationPurge="10000"
xmlns=http://activemq.apache.org/schema/core>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry gcInactiveDestinations="true"
inactiveTimoutBeforeGC="600000" topic=">">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<policyEntry gcInactiveDestinations="true"
inactiveTimoutBeforeGC="600000" queue=">"/>
</policyEntries>
</policyMap>
</destinationPolicy>
<plugins>
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry myAuthEntries... />
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
</broker>
This is mostly just the defaults that AWS gave me. I tried removing the
`constantPendingMessageLimitStrategy`, as I was wondering if we were sending
thousands of messages to the advisory topics that was causing messages to get
deleted from other topics. I didn’t think this made any sense and, sure enough,
it didn’t work.
Is there anyone that can lend their expertise and help me resolve this issue?
Thanks so much!
[ReUqX18imJiQAAAAABJRU5ErkJggg==]
Jonathan Damon
Finished Product Team Captain