[
https://issues.apache.org/jira/browse/KAFKA-8350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
huxihx updated KAFKA-8350:
--------------------------
Description:
Currently, producers do the batch splitting based on the batch size. However,
the split will never succeed when batch size is greatly larger than the
topic-level max message size.
For instance, if the batch size is set to 8MB but we maintain the default value
for broker-side `message.max.bytes` (1000012, about1MB), producer will
endlessly try to split a large batch but never succeeded, as shown below:
{code:java}
[2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error produce
response in correlation id 61 on topic-partition test-0, splitting and retrying
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error produce
response in correlation id 62 on topic-partition test-0, splitting and retrying
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error produce
response in correlation id 63 on topic-partition test-0, splitting and retrying
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error produce
response in correlation id 64 on topic-partition test-0, splitting and retrying
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE
(org.apache.kafka.clients.producer.internals.Sender:617){code}
A better solution is to have producer do splitting based on the minimum of
these two configs. However, it is tricky for the client to get the topic-level
or broker-level config values. Seems there could be three ways to do this:
# When broker throws `RecordTooLargeException`, do not swallow its real
message since it contains the max message size already. If the message is not
swallowed, the client easily gets it from the response.
# Add code to issue `DescribeConfigsRequest` to retrieve the value.
# If splitting failed, decreases the batch size gradually until the split is
successful. For example,
{code:java}
// In RecordAccumulator.java
private int steps = 1;
......
public int splitAndReenqueue(ProducerBatch bigBatch) {
......
Deque<ProducerBatch> dq = bigBatch.split(this.batchSize / steps);
if (dq.size() == 1) // split failed
steps++;
......
}{code}
Do all of these make sense?
was:
Currently, producers do the batch splitting based on the batch size. However,
the split will never succeed when batch size is greatly larger than the
topic-level max message size.
For instance, if the batch size is set to 8MB but we maintain the default value
for broker-side `message.max.bytes` (1000012, about1MB), producer will
endlessly try to split a large batch but never succeeded, as shown below:
{code:java}
[2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error produce
response in correlation id 61 on topic-partition test-0, splitting and retrying
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error produce
response in correlation id 62 on topic-partition test-0, splitting and retrying
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error produce
response in correlation id 63 on topic-partition test-0, splitting and retrying
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE
(org.apache.kafka.clients.producer.internals.Sender:617)
[2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error produce
response in correlation id 64 on topic-partition test-0, splitting and retrying
(2147483647 attempts left). Error: MESSAGE_TOO_LARGE
(org.apache.kafka.clients.producer.internals.Sender:617){code}
A better solution is to have producer do splitting based on the minimum of
these two configs. However, it is tricky for the client to get the topic-level
or broker-level config values. Seems there could be three ways to do this:
# When broker throws `RecordTooLargeException`, do not swallow its real
message since it contains the max message size already. If the message is not
swallowed, the client easily gets it from the response.
# Add code to issue `DescribeConfigsRequest` to retrieve the value.
# If splitting failed, lower down the batch size gradually until the split is
successful. For example,
{code:java}
// In RecordAccumulator.java
private int steps = 1;
......
public int splitAndReenqueue(ProducerBatch bigBatch) {
......
Deque<ProducerBatch> dq = bigBatch.split(this.batchSize / steps);
if (dq.size() == 1) // split failed
steps++;
......
}{code}
Do all of these make sense?
> Splitting batches should consider topic-level message size
> ----------------------------------------------------------
>
> Key: KAFKA-8350
> URL: https://issues.apache.org/jira/browse/KAFKA-8350
> Project: Kafka
> Issue Type: Improvement
> Components: producer
> Affects Versions: 2.3.0
> Reporter: huxihx
> Priority: Major
>
> Currently, producers do the batch splitting based on the batch size. However,
> the split will never succeed when batch size is greatly larger than the
> topic-level max message size.
> For instance, if the batch size is set to 8MB but we maintain the default
> value for broker-side `message.max.bytes` (1000012, about1MB), producer will
> endlessly try to split a large batch but never succeeded, as shown below:
> {code:java}
> [2019-05-10 16:25:09,233] WARN [Producer clientId=producer-1] Got error
> produce response in correlation id 61 on topic-partition test-0, splitting
> and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE
> (org.apache.kafka.clients.producer.internals.Sender:617)
> [2019-05-10 16:25:10,021] WARN [Producer clientId=producer-1] Got error
> produce response in correlation id 62 on topic-partition test-0, splitting
> and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE
> (org.apache.kafka.clients.producer.internals.Sender:617)
> [2019-05-10 16:25:10,758] WARN [Producer clientId=producer-1] Got error
> produce response in correlation id 63 on topic-partition test-0, splitting
> and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE
> (org.apache.kafka.clients.producer.internals.Sender:617)
> [2019-05-10 16:25:12,071] WARN [Producer clientId=producer-1] Got error
> produce response in correlation id 64 on topic-partition test-0, splitting
> and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE
> (org.apache.kafka.clients.producer.internals.Sender:617){code}
> A better solution is to have producer do splitting based on the minimum of
> these two configs. However, it is tricky for the client to get the
> topic-level or broker-level config values. Seems there could be three ways
> to do this:
> # When broker throws `RecordTooLargeException`, do not swallow its real
> message since it contains the max message size already. If the message is not
> swallowed, the client easily gets it from the response.
> # Add code to issue `DescribeConfigsRequest` to retrieve the value.
> # If splitting failed, decreases the batch size gradually until the split is
> successful. For example,
> {code:java}
> // In RecordAccumulator.java
> private int steps = 1;
> ......
> public int splitAndReenqueue(ProducerBatch bigBatch) {
> ......
> Deque<ProducerBatch> dq = bigBatch.split(this.batchSize / steps);
> if (dq.size() == 1) // split failed
> steps++;
> ......
> }{code}
> Do all of these make sense?
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)