Greetings

I am experiencing a couple of issues while using ActiveMQ 5.8. They are:

1) Redelivery policy is not being applied correctly.

2) ActiveMQ broker is not adhering to set usage constraints. The queue is filling up until it runs out of diskspace
despite the fact that store limits are set, etc.

The queues are otherwise working fine.

What am I missing here?

Much appreciate

Jamie


Redelivery Policy
==========

Despite redelivery delay being set to 30 seconds, redelivery is always instant. The policy is not being adhered to. Here's my code:

RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(TimeUnit.MILLISECONDS.convert(getInitialRedeliveryDelaySecs(), TimeUnit.SECONDS)); redeliveryPolicy.setRedeliveryDelay(TimeUnit.MILLISECONDS.convert(getRedeliveryDelaySecs(), TimeUnit.SECONDS)); redeliveryPolicy.setMaximumRedeliveryDelay(TimeUnit.MILLISECONDS.convert(getMaxRedeliveryDelaySecs(), TimeUnit.SECONDS));
redeliveryPolicy.setUseExponentialBackOff(isExponentialBackoff());
redeliveryPolicy.setMaximumRedeliveries(getMaxRedeliveries());
redeliveryPolicy.setBackOffMultiplier(getBackOffMultplier());

If processing fails, I am calling  session.recover();

Here are the settings

(ActiveMQConnectionFactory)connectionFactory).setAlwaysSessionAsync(false);
((ActiveMQConnectionFactory)connectionFactory).setRedeliveryPolicy(getRedeliveryPolicy());
ActiveMQPrefetchPolicy prefectPolicy = new ActiveMQPrefetchPolicy();
prefectPolicy.setQueuePrefetch(10000);
prefectPolicy.setQueueBrowserPrefetch(5000);
((ActiveMQConnectionFactory)connectionFactory).setMaxThreadPoolSize(processThreads); ((ActiveMQConnectionFactory)connectionFactory).setPrefetchPolicy(prefectPolicy);
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
connection = connectionFactory.createConnection();
connection.start();

System Usage Constraints
================

String brokerName = app.getFileSystem().getApplicationName();
brokerService = new BrokerService();
brokerService.setBrokerName(brokerName);
brokerService.setUseJmx(true);
brokerService.setUseLoggingForShutdownErrors(true);
DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
strategy.setProcessNonPersistent(true);
strategy.setProcessExpired(true);
((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
PolicyMap policyMap = new PolicyMap();
List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
PolicyEntry policy = new PolicyEntry();
policy.setProducerFlowControl(producerFlowControl);
policy.setUseCache(true);
policy.setQueue(">");
policy.setDeadLetterStrategy(strategy);
entries.add(policy);
policyMap.setPolicyEntries(entries);
brokerService.setDestinationPolicy(policyMap);

//PERSISTENCE
brokerService.setPersistent(true);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(kahaPath);
TFile dataDir = new TFile(app.getFileSystem().getAppQueuePath() + File.separator + "data");
brokerService.setDataDirectoryFile(dataDir);
dataDir.mkdirs();
TFile tempQueuePath = getTempQueuePath();
brokerService.setTmpDataDirectory(tempQueuePath);
SystemUsage systemUsage = brokerService.getSystemUsage();
systemUsage.getStoreUsage().setLimit(storeUsageMB * 1024 * 1024);
systemUsage.getTempUsage().setLimit(tempUsageMB * 1024 * 1024);
systemUsage.getMemoryUsage().setLimit(memoryUsageMB * 1024 * 1024);
brokerService.setSystemUsage(systemUsage);
brokerService.setPersistenceAdapter(persistenceAdapter);

protected static final String defaultStoreUsageMB = "40";
protected static final String defaultMemoryUsageMB = "32";
protected static final String defaultTempUsageMB = "40";
protected static final String defaultSendFailIfNoSpace = "no";
protected static final String defaultProducerFlowControl = "yes";

Reply via email to