Actually we do have a JIRA tracking this issue: https://issues.apache.org/jira/browse/KAFKA-998
And BTW, any review comments are welcome :) Guozhang On Sat, Aug 24, 2013 at 8:25 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > >> Ok, but perhaps the producer will handle something like this in the > future? > > Yes, I think we need a JIRA for this. > > >> UnretryableFailedToSendMessageException (wraps the root cause) > NoMoreRetriesFailedToSendMessageException (wraps the root cause, from the > final attempt) > > Something like this makes sense. Would you mind creating a JIRA for this so > we can > discuss a solution there ? > > Thanks, > Neha > > > On Sat, Aug 24, 2013 at 10:41 AM, Jason Rosenberg <j...@squareup.com> > wrote: > > > Thanks Neha, > > > > On Sat, Aug 24, 2013 at 10:06 AM, Neha Narkhede <neha.narkh...@gmail.com > > >wrote: > > > > > > >> I gathered from one of your previous responses, that a > > > MessageSizeTooLargeException > > > can be rectified with a smaller batch size. > > > If so, does that imply that the message size limit is measured on the > > > broker by the cumulative size of the batch, and not of any one message? > > > > > > That's right. The broker does the message size check on the compressed > > > message. The size of the compressed message > > > is proportional to the batch size. Hence, reducing the batch size on a > > > retry might make sense here, but currently the > > > producer doesn't do this. > > > > > > > Ok, but perhaps the producer will handle something like this in the > future? > > > > > > > > >> If I want to implement guaranteed delivery semantics, using the new > > > request.required.acks > > > configuration, I need to expose retry logic beyond > > > that built into the producer? > > > > > > The kafka producer must handle recoverable exceptions with a > configurable > > > number of retries and must not retry > > > on unrecoverable exceptions. So ideally you shouldn't have to write > your > > > own batching and retry logic. > > > > > > > So, it seems there might be a bit of a gray area. There is a > configurable > > retry count, which we can increase perhaps to gain confidence that > anything > > recoverable has been sent. But, since this retry count is finite, > there's > > no way to know for sure that it won't succeed if it were retried just one > > more time. So, it is then difficult to conclude that if Producer.send > > throws a FailedToSendMessageException, the message shouldn't be retried. > > > > Perhaps it would be useful to define different exception types, so that a > > caller can have clearer semantics: > > > > UnretryableFailedToSendMessageException (wraps the root cause) > > NoMoreRetriesFailedToSendMessageException (wraps the root cause, from the > > final attempt) > > > > Probably shorter names are possible here! Perhaps these could be > > subclasses of FailedToSendMessageException. Alternately, > > FailedToSendMessageException could include information, such as the > number > > of retries attempted, and a flag indicating whether it's possible to > retry > > the message. > > > > Jason > > > > > > > > > > > > > > > > On Sat, Aug 24, 2013 at 9:54 AM, Jason Rosenberg <j...@squareup.com> > > wrote: > > > > > > > Jun, > > > > > > > > Thanks, this is helpful. > > > > > > > > So, can QueueFullException occur in either sync or async mode (or > just > > > > async mode)? > > > > > > > > If there's a MessageSizeTooLargeException, is there any visibility of > > > this > > > > to the caller? Or will it just be a FailedToSendMessageException. I > > > > gathered from one of your previous responses, that a > > > > MessageSizeTooLargeException can be rectified with a smaller batch > > size. > > > > If so, does that imply that the message size limit is measured on > the > > > > broker by the cumulative size of the batch, and not of any one > message? > > > > (makes sense if the broker doesn't unwrap a batch of messages before > > > > storing on the server). > > > > > > > > If I want to implement guaranteed delivery semantics, using the new > > > > request.required.acks configuration, I need to expose retry logic > > beyond > > > > that built into the producer? And to do this, I need to indicate to > > the > > > > caller whether it's possible to retry, or whether it will be > fruitless. > > > I > > > > suppose allowing message.max.send.retries to allow infinite retries > > (e.g. > > > > by setting it to -1) might be useful. But optionally, I'd like the > > > caller > > > > to be able to handle this retry logic itself. > > > > > > > > Jason > > > > > > > > > > > > On Sat, Aug 24, 2013 at 8:22 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > > > You don't need to restart the producer. The producer currently > > handles > > > > all > > > > > error/exceptions by refreshing the metadata and retrying. If it > fails > > > all > > > > > retries, it throws a FailedToSendMessageException to the caller (in > > > sync > > > > > mode). The original cause is not included in this exception. We > have > > > > > thought about being a bit smarter in the producer retry logic such > > that > > > > it > > > > > only retries on recoverable errors and could implement this at some > > > > point. > > > > > Other than FailedToSendMessageException, the producer can also > throw > > > > > QueueFullException. > > > > > This is an indication that the producer is sending data at a rate > > > faster > > > > > than the broker can handle. This may or may not be recoverable > since > > it > > > > > depends on the load. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Sat, Aug 24, 2013 at 1:44 AM, Jason Rosenberg <j...@squareup.com > > > > > > wrote: > > > > > > > > > > > Jun, > > > > > > > > > > > > There are several others I've seen that I would have thought > would > > be > > > > > > retryable (possibly after an exponential backoff delay). I'm > > curious > > > > > > about: > > > > > > > > > > > > BrokerNotAvailableException > > > > > > FailedToSendMessageException > > > > > > QueueFullException (happens if producerType is 'async') > > > > > > KafkaException (this seems to wrap lots of base conditions, does > > one > > > > have > > > > > > to sort through the different wrapped exception types?) > > > > > > LeaderNotAvailableException > > > > > > MessageSizeTooLargeException (does a batch of messages get > treated > > > as a > > > > > > single message, when checking for message size too large?) > > > > > > ReplicaNotAvailableException > > > > > > UnavailableProducerException > > > > > > UnknownException > > > > > > > > > > > > Also, what about my first question, regarding whether it makes > > sense > > > to > > > > > > refresh a producer by closing it and restarting it after a > failure? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > On Fri, Aug 23, 2013 at 9:07 PM, Jun Rao <jun...@gmail.com> > wrote: > > > > > > > > > > > > > For the most part, only SocketExceptions and > > > > > > NotLeaderForPartitionException > > > > > > > are recoverable. MessageSizeTooLargeException may be > recoverable > > > > with a > > > > > > > smaller batch size. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg < > > j...@squareup.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > I'm using the kafka.javaapi.producer.Producer class from a > java > > > > > client. > > > > > > > > I'm wondering if it ever makes sense to refresh a producer > by > > > > > stopping > > > > > > > it > > > > > > > > and creating a new one, for example in response to a > downstream > > > IO > > > > > > error > > > > > > > > (e.g. a broker got restarted, or a stale socket, etc.). > > > > > > > > > > > > > > > > Or should it always be safe to rely on the producer's > > > > implementation > > > > > to > > > > > > > > manage it's pool of BlockingChannel connections, etc. > > > > > > > > > > > > > > > > I'm also interested in trying to understand which exceptions > > > > > indicate a > > > > > > > > failed send() request might be retryable (basically anything > > that > > > > > > doesn't > > > > > > > > involve a data-dependent problem, like a malformed message, > or > > a > > > > > > message > > > > > > > > too large, etc.). > > > > > > > > > > > > > > > > Unfortunately, the range of Exceptions that can be thrown by > > the > > > > > > various > > > > > > > > javaapi methods is not yet well documented. It would be nice > > to > > > > have > > > > > > > some > > > > > > > > notion of whether an exception is the result of a data error, > > or > > > a > > > > > > > > transient downstream connection error, etc. > > > > > > > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So, can QueueFullException occur in either sync or async mode (or just > > > async mode)? > > > > > > If there's a MessageSizeTooLargeException, is there any visibility of > > this > > > to the caller? Or will it just be a FailedToSendMessageException. I > > > gathered from one of your previous responses, that a > > > MessageSizeTooLargeException can be rectified with a smaller batch > size. > > > If so, does that imply that the message size limit is measured on the > > > broker by the cumulative size of the batch, and not of any one message? > > > (makes sense if the broker doesn't unwrap a batch of messages before > > > storing on the server). > > > > > > If I want to implement guaranteed delivery semantics, using the new > > > request.required.acks configuration, I need to expose retry logic > beyond > > > that built into the producer? > > > > > > -- -- Guozhang