Jiangjie, I changed my code to group by partition, then for each partition
to group mesages into up to 900kb of uncompressed data, and then sent those
batches out. That worked fine and didn't cause any MessageTooLarge errors.
So it looks like the issue is that the producer batches all the messages of
a certain partition together and then compresses them, which may end up too
large.

It would be nice if the producer could do something smarter here, but it's
probably difficult to predict post-compression size and whether it would
hit a limit and the like :/


On Wed, May 13, 2015 at 9:57 AM, Jamie X <jamiex...@gmail.com> wrote:

> (sorry if this messes up the mailing list, I didn't seem to get replies in
> my inbox)
>
> Jiangjie, I am indeed using the old producer, and on sync mode.
>
> > Notice that the old producer uses number of messages as batch limitation
> instead of number of bytes.
>
> Can you clarify this? I see a setting batch.num.messages but it is only
> for async and wouldn't affect sync mode.
>
> > But in your case, it seems you have a single message whose compressed
> size is larger than the max message size Kafka broker accepts. Any idea why?
>
> I don't think this is the case as my messages are at most 70kb when
> uncompressed. (I checked the message sizes it was trying to send)
>
> When you say
> > the list of message will be sent as a batch
>
> does that mean that the producer would group the messages by partition,
> and for each partition, it would batch all the messages for that partition
> together, regardless of whether it would exceed a size limit? If so that
> may explain things.
>
> Thanks,
> Jamie
>
>
> On Tue, May 12, 2015 at 4:40 PM, Jamie X <jamiex...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm wondering when you call kafka.javaapi.Producer.send() with a list of
>> messages, and also have compression on (snappy in this case), how does it
>> decide how many messages to put together as one?
>>
>> The reason I'm asking is that even though my messages are only 70kb
>> uncompressed, the broker complains that I'm hitting the 1mb message limit
>> such as:
>>
>>
>> kafka.common.MessageSizeTooLargeException: Message size is 1035608 bytes
>> which exceeds the maximum configured message size of 1000012.
>>         at
>> kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378)
>>         at
>> kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>>         at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
>>         at kafka.log.Log.append(Log.scala:257)
>>         at
>> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
>>         at
>> kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
>>         at kafka.utils.Utils$.inLock(Utils.scala:535)
>>         at kafka.utils.Utils$.inReadLock(Utils.scala:541)
>>         at
>> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
>>         at
>> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
>>         at
>> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>         at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>         at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>         at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>>
>> Thanks,
>> Jamie
>>
>
>

Reply via email to