In org.apache.activemq.broker.region.Queue#send(ProducerBrokerExchange, Message), it just determine that the producerWindowSize is greater than 0, so set producerWindowSize to 1024 or 10240 can do the same effect??? am i right?
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { final ConnectionContext context = producerExchange.getConnectionContext(); // There is delay between the client sending it and it arriving at the // destination.. it may have expired. message.setRegionDestination(this); ProducerState state = producerExchange.getProducerState(); if (state == null) { LOG.warn("Send failed for: " + message + ", missing producer state for: " + producerExchange); throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state"); } final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); * final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0* && !context.isInRecoveryMode(); if (message.isExpired()) { // message not stored - or added to stats yet - so chuck here broker.getRoot().messageExpired(context, message, null); if (sendProducerAck) { ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); context.getConnection().dispatchAsync(ack); } return; } if (memoryUsage.isFull()) { isFull(context, memoryUsage); fastProducer(context, producerInfo); if (isProducerFlowControl() && context.isProducerFlowControl()) { if (warnOnProducerFlowControl) { warnOnProducerFlowControl = false; LOG .info("Usage Manager Memory Limit (" + memoryUsage.getLimit() + ") reached on " + getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." + " See http://activemq.apache.org/producer-flow-control.html for more info"); } if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); } // We can avoid blocking due to low usage if the producer is // sending // a sync message or if it is using a producer window *if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {* // copy the exchange state since the context will be // modified while we are waiting // for space. final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); synchronized (messagesWaitingForSpace) { // Start flow control timeout task // Prevent trying to start it multiple times if (!flowControlTimeoutTask.isAlive()) { flowControlTimeoutTask.setName(getName()+" Producer Flow Control Timeout Task"); flowControlTimeoutTask.start(); } messagesWaitingForSpace.put(message.getMessageId(), new Runnable() { public void run() { try { // While waiting for space to free up... the // message may have expired. if (message.isExpired()) { LOG.error("expired waiting for space.."); broker.messageExpired(context, message, null); destinationStatistics.getExpired().increment(); } else { doMessageSend(producerExchangeCopy, message); } if (sendProducerAck) { ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message .getSize()); context.getConnection().dispatchAsync(ack); } else { Response response = new Response(); response.setCorrelationId(message.getCommandId()); context.getConnection().dispatchAsync(response); } } catch (Exception e) { if (!sendProducerAck && !context.isInRecoveryMode()) { ExceptionResponse response = new ExceptionResponse(e); response.setCorrelationId(message.getCommandId()); context.getConnection().dispatchAsync(response); } else { LOG.debug("unexpected exception on deferred send of :" + message, e); } } } }); if (!context.isNetworkConnection() && systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage .getSendFailIfNoSpaceAfterTimeout())); } registerCallbackForNotFullNotification(); context.setDontSendReponse(true); return; } } else { if (memoryUsage.isFull()) { waitForSpace(context, memoryUsage, "Usage Manager Memory Limit reached. Producer (" + message.getProducerId() + ") stopped to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"); } // The usage manager could have delayed us by the time // we unblock the message could have expired.. if (message.isExpired()) { if (LOG.isDebugEnabled()) { LOG.debug("Expired message: " + message); } broker.getRoot().messageExpired(context, message, null); return; } } } } doMessageSend(producerExchange, message); if (sendProducerAck) { ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); context.getConnection().dispatchAsync(ack); } } -- View this message in context: http://activemq.2283324.n4.nabble.com/Producer-Window-Size-tp4668547.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.