I am referring to wiki http://kafka.apache.org/08/configuration.html and
following parameter control max batch message bytes as far as I know.
Kafka Community, please correct me if I am wrong.  I do not want to create
confusion for Kafka User Community here.   Also, if you increase this limit
than you have to set the corresponding limit increase on consumer side as
well (fetch.message.max.bytes).

Since we are using batch async mode, our messages are getting drop sometime
if the entire batch bytes  exceed this limit so I was asking Kafka
Developers if any optimal way to determine the batch size based on this
limit to minimize the data loss. Because, entire batch is rejected by
brokers.

message.max.bytes 1000000 The maximum size of a message that the server can
receive. It is important that this property be in sync with the maximum
fetch size your consumers use or else an unruly producer will be able to
publish messages too large for consumers to consume.

Thanks,

Bhavesh


On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
alexis.mi...@airbedandbreakfast.com> wrote:

> Hi Bhavesh
>
> can you explain what limit you're referring to?
> I'm asking because `message.max.bytes` is applied per message not per
> batch.
> is there another limit I should be aware of?
>
> thanks
>
>
> On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com
> >
> wrote:
>
> > Hi Jun,
> >
> > We have similar problem.  We have variable length of messages.  So when
> we
> > have fixed size of Batch sometime the batch exceed the limit set on the
> > brokers (2MB).
> >
> > So can Producer have some extra logic to determine the optimal batch size
> > by looking at configured message.max.bytes  value.
> >
> > During the metadata update, Producer will get this value from the Broker
> > for each topic and Producer will check if current batch size reach this
> > limit than break batch into smaller chunk such way that It would not
> exceed
> > limit (unless single message exceed the limit). Basically try to avoid
> data
> > loss as much as possible.
> >
> > Please let me know what is your opinion on this...
> >
> > Thanks,
> >
> > Bhavesh
> >
> >
> > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > alexis.mi...@airbedandbreakfast.com> wrote:
> >
> > > Thanks Jun.
> > >
> > > I'll create a jira and try to provide a patch. I think this is pretty
> > > serious.
> > >
> > > On Friday, August 29, 2014, Jun Rao <jun...@gmail.com> wrote:
> > >
> > > > The goal of batching is mostly to reduce the # RPC calls to the
> broker.
> > > If
> > > > compression is enabled, a larger batch typically implies better
> > > compression
> > > > ratio.
> > > >
> > > > The reason that we have to fail the whole batch is that the error
> code
> > in
> > > > the produce response is per partition, instead of per message.
> > > >
> > > > Retrying individual messages on MessageSizeTooLarge seems reasonable.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote:
> > > >
> > > > > Could you explain the goals of batches? I was assuming this was
> > simply
> > > a
> > > > > performance optimization, but this behavior makes me think I'm
> > missing
> > > > > something.
> > > > > is a batch more than a list of *independent* messages?
> > > > >
> > > > > Why would you reject the whole batch? One invalid message causes
> the
> > > loss
> > > > > of batch.num.messages-1 messages :(
> > > > > It seems pretty critical to me.
> > > > >
> > > > > If ack=0, the producer will never know about it.
> > > > > If ack !=0, the producer will retry the whole batch. If the issue
> was
> > > > > related to data corruption (etc), retries might work. But in the
> case
> > > of
> > > > > "big message", the batch will always be rejected and the producer
> > will
> > > > give
> > > > > up.
> > > > >
> > > > > If the messages are indeed considered independent, I think this is
> a
> > > > pretty
> > > > > serious issue.
> > > > >
> > > > > I see 2 possible fix approaches:
> > > > > - the broker could reject only the invalid messages
> > > > > - the broker could reject the whole batch (like today) but the
> > producer
> > > > (if
> > > > > ack!=0) could retry messages one at a time on exception like
> > > > > "MessageSizeTooLarge".
> > > > >
> > > > > opinions?
> > > > >
> > > > > Alexis
> > > > >
> > > > > ```
> > > > > [2014-08-29 16:00:35,170] WARN Produce request with correlation id
> 46
> > > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > [2014-08-29 16:00:35,284] WARN Produce request with correlation id
> 51
> > > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > [2014-08-29 16:00:35,392] WARN Produce request with correlation id
> 56
> > > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > [2014-08-29 16:00:35,499] WARN Produce request with correlation id
> 61
> > > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics
> > test
> > > > > with correlation ids in [43,62]
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
> > > > > (kafka.producer.async.ProducerSendThread)
> > > > > kafka.common.FailedToSendMessageException: Failed to send messages
> > > after
> > > > 3
> > > > > tries.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > >  at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > >  at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > ```
> > > > >
> > > > >
> > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <jun...@gmail.com
> > > > <javascript:;>> wrote:
> > > > >
> > > > > > That's right. If one message in a batch exceeds the size limit,
> the
> > > > whole
> > > > > > batch is rejected.
> > > > > >
> > > > > > When determining message.max.bytes, the most important thing to
> > > > consider
> > > > > is
> > > > > > probably memory since currently we need to allocate memory for a
> > full
> > > > > > message in the broker and the producer and the consumer client.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > >
> > > > > > > am I miss reading this loop:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > >
> > > > > > > it seems like all messages from `validMessages` (which is
> > > > > > > ByteBufferMessageSet) are NOT appended if one of the message
> size
> > > > > exceeds
> > > > > > > the limit.
> > > > > > >
> > > > > > > I hope I'm missing something.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > > alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > thanks for you answer.
> > > > > > > > Unfortunately the size won't help much, I'd like to see the
> > > actual
> > > > > > > message
> > > > > > > > data.
> > > > > > > >
> > > > > > > > By the way what are the things to consider when deciding on
> > > > > > > > `message.max.bytes` value?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <jun...@gmail.com
> > > > <javascript:;>> wrote:
> > > > > > > >
> > > > > > > >> The message size check is currently only done on the broker.
> > If
> > > > you
> > > > > > > enable
> > > > > > > >> trace level logging in RequestChannel, you will see the
> > produce
> > > > > > request,
> > > > > > > >> which includes the size of each partition.
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >>
> > > > > > > >> Jun
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > > > > >> alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > > >>
> > > > > > > >> > Hello,
> > > > > > > >> >
> > > > > > > >> > my brokers are reporting that some received messages
> exceed
> > > the
> > > > > > > >> > `message.max.bytes` value.
> > > > > > > >> > I'd like to know what producers are at fault but It is
> > pretty
> > > > much
> > > > > > > >> > impossible:
> > > > > > > >> > - the brokers don't log the content of the rejected
> messages
> > > > > > > >> > - the log messages do not contain the IP of the producers
> > > > > > > >> > - on the consumer side, no exception is thrown (afaik it
> is
> > > > > because
> > > > > > > >> Ack-0
> > > > > > > >> > is used). The only kind of notification is to closed the
> > > > > connection.
> > > > > > > >> >
> > > > > > > >> > [1] Do you have any suggestions to track down the guilty
> > > > producers
> > > > > > or
> > > > > > > >> find
> > > > > > > >> > out the message content?
> > > > > > > >> >
> > > > > > > >> > Even though it makes total sense to have the limit defined
> > and
> > > > > > applied
> > > > > > > >> on
> > > > > > > >> > the brokers, I was thinking that this check could also be
> > > > applied
> > > > > by
> > > > > > > the
> > > > > > > >> > producers. Some google results suggest that
> > > `message.max.bytes`
> > > > > > might
> > > > > > > be
> > > > > > > >> > used by the producers but I can't find any trace of that
> > > > behavior
> > > > > in
> > > > > > > the
> > > > > > > >> > code.
> > > > > > > >> >
> > > > > > > >> > The closest thing I have is
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > >> > but it simply logs the message size and content and the
> log
> > > > level
> > > > > is
> > > > > > > >> trace.
> > > > > > > >> >
> > > > > > > >> > [2] could you please confirm if such a producer-side check
> > > > exists?
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > thanks!
> > > > > > > >> >
> > > > > > > >> > Alexis
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to