[ https://issues.apache.org/jira/browse/KAFKA-8350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
huxihx updated KAFKA-8350: -------------------------- Description: Currently, producers do the batch splitting based on the batch size. However, the split will never succeed when batch size is greatly larger than the topic-level max message size. For instance, if the batch size is set to 8MB but we maintain the default value for broker-side `message.max.bytes` (1000012, about1MB), producer will endlessly try to split a large batch but never succeeded, as shown below: {code:java} [2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error produce response in correlation id 61 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error produce response in correlation id 62 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error produce response in correlation id 63 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error produce response in correlation id 64 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617){code} A better solution is to have producer do splitting based on the minimum of these two configs. However, it is tricky for the client to get the topic-level or broker-level config values. Seems there could be three ways to do this: # When broker throws `RecordTooLargeException`, do not swallow its real message since it contains the max message size already. If the message is not swallowed, the client easily gets it from the response. # Add code to issue `DescribeConfigsRequest` to retrieve the value. # If splitting failed, decreases the batch size gradually until the split is successful. For example, {code:java} // In RecordAccumulator.java private int steps = 1; ...... public int splitAndReenqueue(ProducerBatch bigBatch) { ...... Deque<ProducerBatch> dq = bigBatch.split(this.batchSize / steps); if (dq.size() == 1) // split failed steps++; ...... }{code} Do all of these make sense? was: Currently, producers do the batch splitting based on the batch size. However, the split will never succeed when batch size is greatly larger than the topic-level max message size. For instance, if the batch size is set to 8MB but we maintain the default value for broker-side `message.max.bytes` (1000012, about1MB), producer will endlessly try to split a large batch but never succeeded, as shown below: {code:java} [2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error produce response in correlation id 61 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error produce response in correlation id 62 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error produce response in correlation id 63 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617) [2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error produce response in correlation id 64 on topic-partition test-0, splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE (org.apache.kafka.clients.producer.internals.Sender:617){code} A better solution is to have producer do splitting based on the minimum of these two configs. However, it is tricky for the client to get the topic-level or broker-level config values. Seems there could be three ways to do this: # When broker throws `RecordTooLargeException`, do not swallow its real message since it contains the max message size already. If the message is not swallowed, the client easily gets it from the response. # Add code to issue `DescribeConfigsRequest` to retrieve the value. # If splitting failed, lower down the batch size gradually until the split is successful. For example, {code:java} // In RecordAccumulator.java private int steps = 1; ...... public int splitAndReenqueue(ProducerBatch bigBatch) { ...... Deque<ProducerBatch> dq = bigBatch.split(this.batchSize / steps); if (dq.size() == 1) // split failed steps++; ...... }{code} Do all of these make sense? > Splitting batches should consider topic-level message size > ---------------------------------------------------------- > > Key: KAFKA-8350 > URL: https://issues.apache.org/jira/browse/KAFKA-8350 > Project: Kafka > Issue Type: Improvement > Components: producer > Affects Versions: 2.3.0 > Reporter: huxihx > Priority: Major > > Currently, producers do the batch splitting based on the batch size. However, > the split will never succeed when batch size is greatly larger than the > topic-level max message size. > For instance, if the batch size is set to 8MB but we maintain the default > value for broker-side `message.max.bytes` (1000012, about1MB), producer will > endlessly try to split a large batch but never succeeded, as shown below: > {code:java} > [2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error > produce response in correlation id 61 on topic-partition test-0, splitting > and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE > (org.apache.kafka.clients.producer.internals.Sender:617) > [2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error > produce response in correlation id 62 on topic-partition test-0, splitting > and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE > (org.apache.kafka.clients.producer.internals.Sender:617) > [2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error > produce response in correlation id 63 on topic-partition test-0, splitting > and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE > (org.apache.kafka.clients.producer.internals.Sender:617) > [2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error > produce response in correlation id 64 on topic-partition test-0, splitting > and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE > (org.apache.kafka.clients.producer.internals.Sender:617){code} > A better solution is to have producer do splitting based on the minimum of > these two configs. However, it is tricky for the client to get the > topic-level or broker-level config values. Seems there could be three ways > to do this: > # When broker throws `RecordTooLargeException`, do not swallow its real > message since it contains the max message size already. If the message is not > swallowed, the client easily gets it from the response. > # Add code to issue `DescribeConfigsRequest` to retrieve the value. > # If splitting failed, decreases the batch size gradually until the split is > successful. For example, > {code:java} > // In RecordAccumulator.java > private int steps = 1; > ...... > public int splitAndReenqueue(ProducerBatch bigBatch) { > ...... > Deque<ProducerBatch> dq = bigBatch.split(this.batchSize / steps); > if (dq.size() == 1) // split failed > steps++; > ...... > }{code} > Do all of these make sense? > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)