Greetings
I am experiencing a couple of issues while using ActiveMQ 5.8. They are:
1) Redelivery policy is not being applied correctly.
2) ActiveMQ broker is not adhering to set usage constraints. The queue
is filling up until it runs out of diskspace
despite the fact that store limits are set, etc.
The queues are otherwise working fine.
What am I missing here?
Much appreciate
Jamie
Redelivery Policy
==========
Despite redelivery delay being set to 30 seconds, redelivery is always
instant. The policy is not being adhered to. Here's my code:
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(TimeUnit.MILLISECONDS.convert(getInitialRedeliveryDelaySecs(),
TimeUnit.SECONDS));
redeliveryPolicy.setRedeliveryDelay(TimeUnit.MILLISECONDS.convert(getRedeliveryDelaySecs(),
TimeUnit.SECONDS));
redeliveryPolicy.setMaximumRedeliveryDelay(TimeUnit.MILLISECONDS.convert(getMaxRedeliveryDelaySecs(),
TimeUnit.SECONDS));
redeliveryPolicy.setUseExponentialBackOff(isExponentialBackoff());
redeliveryPolicy.setMaximumRedeliveries(getMaxRedeliveries());
redeliveryPolicy.setBackOffMultiplier(getBackOffMultplier());
If processing fails, I am calling session.recover();
Here are the settings
(ActiveMQConnectionFactory)connectionFactory).setAlwaysSessionAsync(false);
((ActiveMQConnectionFactory)connectionFactory).setRedeliveryPolicy(getRedeliveryPolicy());
ActiveMQPrefetchPolicy prefectPolicy = new ActiveMQPrefetchPolicy();
prefectPolicy.setQueuePrefetch(10000);
prefectPolicy.setQueueBrowserPrefetch(5000);
((ActiveMQConnectionFactory)connectionFactory).setMaxThreadPoolSize(processThreads);
((ActiveMQConnectionFactory)connectionFactory).setPrefetchPolicy(prefectPolicy);
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
connection = connectionFactory.createConnection();
connection.start();
System Usage Constraints
================
String brokerName = app.getFileSystem().getApplicationName();
brokerService = new BrokerService();
brokerService.setBrokerName(brokerName);
brokerService.setUseJmx(true);
brokerService.setUseLoggingForShutdownErrors(true);
DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
strategy.setProcessNonPersistent(true);
strategy.setProcessExpired(true);
((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
PolicyMap policyMap = new PolicyMap();
List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
PolicyEntry policy = new PolicyEntry();
policy.setProducerFlowControl(producerFlowControl);
policy.setUseCache(true);
policy.setQueue(">");
policy.setDeadLetterStrategy(strategy);
entries.add(policy);
policyMap.setPolicyEntries(entries);
brokerService.setDestinationPolicy(policyMap);
//PERSISTENCE
brokerService.setPersistent(true);
KahaDBPersistenceAdapter persistenceAdapter = new
KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(kahaPath);
TFile dataDir = new TFile(app.getFileSystem().getAppQueuePath() +
File.separator + "data");
brokerService.setDataDirectoryFile(dataDir);
dataDir.mkdirs();
TFile tempQueuePath = getTempQueuePath();
brokerService.setTmpDataDirectory(tempQueuePath);
SystemUsage systemUsage = brokerService.getSystemUsage();
systemUsage.getStoreUsage().setLimit(storeUsageMB * 1024 * 1024);
systemUsage.getTempUsage().setLimit(tempUsageMB * 1024 * 1024);
systemUsage.getMemoryUsage().setLimit(memoryUsageMB * 1024 * 1024);
brokerService.setSystemUsage(systemUsage);
brokerService.setPersistenceAdapter(persistenceAdapter);
protected static final String defaultStoreUsageMB = "40";
protected static final String defaultMemoryUsageMB = "32";
protected static final String defaultTempUsageMB = "40";
protected static final String defaultSendFailIfNoSpace = "no";
protected static final String defaultProducerFlowControl = "yes";