yes. if you send asynchronously (tx or non-persist messages by default), you can enable producer window to not block the connection on producer flow control.
On Tue, Jun 25, 2013 at 4:14 AM, bizcenter <bizcenter...@gmail.com> wrote: > In org.apache.activemq.broker.region.Queue#send(ProducerBrokerExchange, > Message), it just determine that the producerWindowSize is greater than 0, > so set producerWindowSize to 1024 or 10240 can do the same effect??? am i > right? > > public void send(final ProducerBrokerExchange producerExchange, final > Message message) throws Exception { > final ConnectionContext context = > producerExchange.getConnectionContext(); > // There is delay between the client sending it and it arriving at > the > // destination.. it may have expired. > message.setRegionDestination(this); > ProducerState state = producerExchange.getProducerState(); > if (state == null) { > LOG.warn("Send failed for: " + message + ", missing producer > state for: " + producerExchange); > throw new JMSException("Cannot send message to " + > getActiveMQDestination() + " with invalid (null) producer state"); > } > final ProducerInfo producerInfo = > producerExchange.getProducerState().getInfo(); > * final boolean sendProducerAck = !message.isResponseRequired() && > producerInfo.getWindowSize() > 0* > && !context.isInRecoveryMode(); > if (message.isExpired()) { > // message not stored - or added to stats yet - so chuck here > broker.getRoot().messageExpired(context, message, null); > if (sendProducerAck) { > ProducerAck ack = new > ProducerAck(producerInfo.getProducerId(), message.getSize()); > context.getConnection().dispatchAsync(ack); > } > return; > } > if (memoryUsage.isFull()) { > isFull(context, memoryUsage); > fastProducer(context, producerInfo); > if (isProducerFlowControl() && context.isProducerFlowControl()) > { > if (warnOnProducerFlowControl) { > warnOnProducerFlowControl = false; > LOG > .info("Usage Manager Memory Limit (" > + memoryUsage.getLimit() > + ") reached on " > + > getActiveMQDestination().getQualifiedName() > + ". Producers will be throttled to the > rate at which messages are removed from this destination to prevent > flooding > it." > + " See > http://activemq.apache.org/producer-flow-control.html for more info"); > } > > if (!context.isNetworkConnection() && > systemUsage.isSendFailIfNoSpace()) { > throw new ResourceAllocationException("Usage Manager > Memory Limit reached. Stopping producer (" > + message.getProducerId() + ") to prevent > flooding " > + getActiveMQDestination().getQualifiedName() + > "." > + " See > http://activemq.apache.org/producer-flow-control.html for more info"); > } > > // We can avoid blocking due to low usage if the producer > is > // sending > // a sync message or if it is using a producer window > *if (producerInfo.getWindowSize() > 0 || > message.isResponseRequired()) {* > // copy the exchange state since the context will be > // modified while we are waiting > // for space. > final ProducerBrokerExchange producerExchangeCopy = > producerExchange.copy(); > synchronized (messagesWaitingForSpace) { > // Start flow control timeout task > // Prevent trying to start it multiple times > if (!flowControlTimeoutTask.isAlive()) { > flowControlTimeoutTask.setName(getName()+" > Producer Flow Control Timeout Task"); > flowControlTimeoutTask.start(); > } > messagesWaitingForSpace.put(message.getMessageId(), > new Runnable() { > public void run() { > > try { > // While waiting for space to free > up... > the > // message may have expired. > if (message.isExpired()) { > LOG.error("expired waiting for > space.."); > broker.messageExpired(context, > message, null); > > destinationStatistics.getExpired().increment(); > } else { > doMessageSend(producerExchangeCopy, > message); > } > > if (sendProducerAck) { > ProducerAck ack = new > ProducerAck(producerInfo.getProducerId(), message > .getSize()); > > context.getConnection().dispatchAsync(ack); > } else { > Response response = new Response(); > > response.setCorrelationId(message.getCommandId()); > > context.getConnection().dispatchAsync(response); > } > > } catch (Exception e) { > if (!sendProducerAck && > !context.isInRecoveryMode()) { > ExceptionResponse response = new > ExceptionResponse(e); > > response.setCorrelationId(message.getCommandId()); > > context.getConnection().dispatchAsync(response); > } else { > LOG.debug("unexpected exception on > deferred send of :" + message, e); > } > } > } > }); > > if (!context.isNetworkConnection() && > systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) { > flowControlTimeoutMessages.add(new > TimeoutMessage(message, context, systemUsage > .getSendFailIfNoSpaceAfterTimeout())); > } > > registerCallbackForNotFullNotification(); > context.setDontSendReponse(true); > return; > } > > } else { > > if (memoryUsage.isFull()) { > waitForSpace(context, memoryUsage, "Usage Manager > Memory Limit reached. Producer (" > + message.getProducerId() + ") stopped to > prevent flooding " > + > getActiveMQDestination().getQualifiedName() + "." > + " See > http://activemq.apache.org/producer-flow-control.html for more info"); > } > > // The usage manager could have delayed us by the time > // we unblock the message could have expired.. > if (message.isExpired()) { > if (LOG.isDebugEnabled()) { > LOG.debug("Expired message: " + message); > } > broker.getRoot().messageExpired(context, message, > null); > return; > } > } > } > } > doMessageSend(producerExchange, message); > if (sendProducerAck) { > ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), > message.getSize()); > context.getConnection().dispatchAsync(ack); > } > } > > > > -- > View this message in context: > http://activemq.2283324.n4.nabble.com/Producer-Window-Size-tp4668547.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > -- *Christian Posta* http://www.christianposta.com/blog twitter: @christianposta