The goal of batching is mostly to reduce the # RPC calls to the broker. If
compression is enabled, a larger batch typically implies better compression
ratio.

The reason that we have to fail the whole batch is that the error code in
the produce response is per partition, instead of per message.

Retrying individual messages on MessageSizeTooLarge seems reasonable.

Thanks,

Jun


On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
alexis.mi...@airbedandbreakfast.com> wrote:

> Could you explain the goals of batches? I was assuming this was simply a
> performance optimization, but this behavior makes me think I'm missing
> something.
> is a batch more than a list of *independent* messages?
>
> Why would you reject the whole batch? One invalid message causes the loss
> of batch.num.messages-1 messages :(
> It seems pretty critical to me.
>
> If ack=0, the producer will never know about it.
> If ack !=0, the producer will retry the whole batch. If the issue was
> related to data corruption (etc), retries might work. But in the case of
> "big message", the batch will always be rejected and the producer will give
> up.
>
> If the messages are indeed considered independent, I think this is a pretty
> serious issue.
>
> I see 2 possible fix approaches:
> - the broker could reject only the invalid messages
> - the broker could reject the whole batch (like today) but the producer (if
> ack!=0) could retry messages one at a time on exception like
> "MessageSizeTooLarge".
>
> opinions?
>
> Alexis
>
> ```
> [2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
> failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> (kafka.producer.async.DefaultEventHandler)
> [2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
> failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> (kafka.producer.async.DefaultEventHandler)
> [2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
> failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> (kafka.producer.async.DefaultEventHandler)
> [2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
> failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> (kafka.producer.async.DefaultEventHandler)
> [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics test
> with correlation ids in [43,62] (kafka.producer.async.DefaultEventHandler)
> [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
> (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>  at
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>  at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> ```
>
>
> On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <jun...@gmail.com> wrote:
>
> > That's right. If one message in a batch exceeds the size limit, the whole
> > batch is rejected.
> >
> > When determining message.max.bytes, the most important thing to consider
> is
> > probably memory since currently we need to allocate memory for a full
> > message in the broker and the producer and the consumer client.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > alexis.mi...@airbedandbreakfast.com> wrote:
> >
> > > am I miss reading this loop:
> > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > >
> > > it seems like all messages from `validMessages` (which is
> > > ByteBufferMessageSet) are NOT appended if one of the message size
> exceeds
> > > the limit.
> > >
> > > I hope I'm missing something.
> > >
> > >
> > >
> > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > alexis.mi...@airbedandbreakfast.com> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > thanks for you answer.
> > > > Unfortunately the size won't help much, I'd like to see the actual
> > > message
> > > > data.
> > > >
> > > > By the way what are the things to consider when deciding on
> > > > `message.max.bytes` value?
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <jun...@gmail.com> wrote:
> > > >
> > > >> The message size check is currently only done on the broker. If you
> > > enable
> > > >> trace level logging in RequestChannel, you will see the produce
> > request,
> > > >> which includes the size of each partition.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >>
> > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > >> alexis.mi...@airbedandbreakfast.com> wrote:
> > > >>
> > > >> > Hello,
> > > >> >
> > > >> > my brokers are reporting that some received messages exceed the
> > > >> > `message.max.bytes` value.
> > > >> > I'd like to know what producers are at fault but It is pretty much
> > > >> > impossible:
> > > >> > - the brokers don't log the content of the rejected messages
> > > >> > - the log messages do not contain the IP of the producers
> > > >> > - on the consumer side, no exception is thrown (afaik it is
> because
> > > >> Ack-0
> > > >> > is used). The only kind of notification is to closed the
> connection.
> > > >> >
> > > >> > [1] Do you have any suggestions to track down the guilty producers
> > or
> > > >> find
> > > >> > out the message content?
> > > >> >
> > > >> > Even though it makes total sense to have the limit defined and
> > applied
> > > >> on
> > > >> > the brokers, I was thinking that this check could also be applied
> by
> > > the
> > > >> > producers. Some google results suggest that `message.max.bytes`
> > might
> > > be
> > > >> > used by the producers but I can't find any trace of that behavior
> in
> > > the
> > > >> > code.
> > > >> >
> > > >> > The closest thing I have is
> > > >> >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > >> > but it simply logs the message size and content and the log level
> is
> > > >> trace.
> > > >> >
> > > >> > [2] could you please confirm if such a producer-side check exists?
> > > >> >
> > > >> >
> > > >> > thanks!
> > > >> >
> > > >> > Alexis
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to