Why are consumers in other timezones a concern? The expiration timestamp is measured in milliseconds UTC, so the same instant in time will have the same value in all timezones. As long as all clocks are relatively close to being in sync, it shouldn't matter what timezone anyone's located in...
Also, your statement that the consumer is the one checking the expiration time is correct but incomplete; all brokers will also check expiration times of messages they hold, and DLQ (or delete) any that are expired. On Fri, Oct 3, 2014 at 10:08 AM, coryplusplus <coryplusp...@gmail.com> wrote: > Hi, > Currently I am using apache-activemq-5.5.1 and am having some trouble with > memory usage filling up for a particular topic. > > 2014-08-12 22:33:44,881 | INFO | Usage Manager memory limit reached. > Stopping producer (XXXXX) to prevent flooding topic://SERVER_PONG. See > http://activemq.apache.org/producer-flow-control.html for more info > (blocking for: 1s) | org.apache.activemq.broker.region.Topic | ActiveMQ > Transport: tcp:///127.0.0.1:51252 > > This blocking continues until activeMQ is restarted. > > We basically have a client/server application where activemq is hosted on > the same machine as the server. > The server has multiple components and each produce a SERVER_PONG message > which is sent in response to a SERVER_PING message from the client. Our > current theory is that somehow our client is getting into an unexpected > state where it is able to send SERVER_PING messages but unable to consume > SERVER_PONG messages. This results in the SERVER_PONG messages backing up > and eventually causing us to exhaust our memory resources as mentioned > above. > > Our first attempt at a solution was to utilize the timeToLive variable and > set it for each SERVER_PONG message that was sent. That would have made > sure > that we weren’t keeping SERVER_PONG messages around that were not > acknowledged and would stop us from flooding the broker. This however was > not a possibility as our clients can potentially be based in different time > zones than our server. I saw a lot of posts recommending the timestamp > plugin but that does not work in our case since our producer and broker are > on the same server and our consumer is the one checking the expiration time > before processing. (see > > http://activemq.2283324.n4.nabble.com/timeToLive-with-out-of-sync-clocks-td3009739.html > ) > > So the question now has become, is there a way to remove these server pong > messages that have not been acknowledged after x amount of time. Does > ActiveMQ provide an API that will allow us to check number of inFlight > messages per topic and remove them accordingly if we have too many? > > Here is our activemq.xml configuration file if needed. > > > <beans > xmlns="http://www.springframework.org/schema/beans" > xmlns:amq="http://activemq.apache.org/schema/core" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans-2.0.xsd > http://activemq.apache.org/schema/core > http://activemq.apache.org/schema/core/activemq-core.xsd"> > > > <bean > > class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> > <property name="locations"> > > <value>file:${activemq.base}/conf/credentials.properties</value> > </property> > </bean> > > > <broker xmlns="http://activemq.apache.org/schema/core" > brokerName="localhost" dataDirectory="${activemq.base}/data" > persistent="false" > > > > > <destinationPolicy> > <policyMap> > <policyEntries> > > <policyEntry > topic="SERVER_PONG.>"> > > <pendingMessageLimitStrategy> > > <constantPendingMessageLimitStrategy limit="50" /> > > </pendingMessageLimitStrategy> > > </policyEntry> > > <policyEntry > topic="FROM_STATUS_SERVER.>"> > > <pendingMessageLimitStrategy> > > <constantPendingMessageLimitStrategy limit="5000" /> > > </pendingMessageLimitStrategy> > > </policyEntry> > > <policyEntry > topic=">" producerFlowControl="false" memoryLimit="1 kb"> > > <pendingSubscriberPolicy> > > <vmCursor /> > > </pendingSubscriberPolicy> > > </policyEntry> > > <policyEntry > queue=">" producerFlowControl="false" memoryLimit="1 kb"> > > > </policyEntry> > </policyEntries> > </policyMap> > </destinationPolicy> > > > > <managementContext> > <managementContext createConnector="true"/> > </managementContext> > > > > > > <systemUsage> > <systemUsage> > <memoryUsage> > <memoryUsage limit="3072 mb"/> > </memoryUsage> > <storeUsage> > <storeUsage limit="5 gb"/> > </storeUsage> > <tempUsage> > <tempUsage limit="8 gb"/> > </tempUsage> > </systemUsage> > </systemUsage> > > > <transportConnectors> > <transportConnector name="openwire" > uri="tcp:// > 0.0.0.0:60000?transport.keepAlive=true&wireformat.maxInactivityDuration=0&transport.connectionTimeout=0 > "/> > </transportConnectors> > > </broker> > > > > </beans> > > > > > > -- > View this message in context: > http://activemq.2283324.n4.nabble.com/Remove-Unacknowledged-Heartbeat-Message-tp4686129.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. >