[ 
https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14333041#comment-14333041
 ] 

Ewen Cheslack-Postava commented on KAFKA-1788:
----------------------------------------------

Ok, I'll try to clear up a few issues.

I think that just making sure we make NetworkClient.leastLoadedNode eventually 
returns all nodes isn't sufficient. I was just raising another case where this 
issue could occur. The reason this isn't sufficient for the original case is 
due to the type of situation [~Bmis13] raises. If you have a temporary network 
outage to a single broker (e.g. due to firewall misconfiguration or just a 
network partition issues), it may still correctly be listed as leader. If 
holding the data in the RecordAccumulator only affected data sent to that one 
broker, then as [~jkreps] points out, we could potentially get away with just 
holding on to the messages indefinitely since errors should manifest in other 
ways. (I think it's *better* to have the timeouts, but not strictly necessary).

However, since the RecordAccumulator is a shared resource, holding onto these 
messages also means you're going to block sending data to other brokers once 
your buffer fills up with data for the unreachable broker. Adding timeouts at 
least ensures messages for these other brokers will eventually get a chance to 
send data, even if there are periods where they are automatically rejected 
because the buffer is already full. So [~parth.brahmbhatt], I think the 
approach you're trying to take in the patch is definitely the right thing to 
do, and I agree with [~Bmis13] that the error record metrics definitely should 
(eventually) be increasing.

More generally -- yes, pretty much everything that could potentially block 
things up for a long time/indefinitely *should* have a timeout. And in a lot of 
cases this is true even if the operation will eventually timeout "naturally", 
e.g. due to a TCP timeout. It's better to have control over the timeout (even 
if we highly recommend using the default values) than rely on settings from 
other systems, especially when they may be adjusted in unexpected ways outside 
of our control. This is a pervasive concern that we should keep an eye out for 
with new code, and try to file JIRAs for as we find missing timeouts in 
existing code.

Given the above, I think the options for controlling memory usage may not be 
very good for some use cases -- we've been saying people should use a single 
producer where possible since it's very fast and you actually benefit from 
sharing the network thread since you can collect all data for all 
topic-partitions destined for the same broker into a single request. But it 
turns out that sharing the underlying resources (the buffer) can lead to 
starvation for some topic-partitions when it shouldn't really be necessary. 
Would it make sense to allow a per-topic, or even per-partition limit on memory 
usage? So the effect would be similar to fetch.message.max.bytes for the 
consumer, where your actual memory usage cap is a n times the value, where n is 
the number of topic-partitions you're working with? It could also be by broker, 
but I think that leads to much less intuitive and harder to predict behavior. 
If people think that's a good idea we can file an additional issue for that.

> producer record can stay in RecordAccumulator forever if leader is no 
> available
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-1788
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1788
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, producer 
>    Affects Versions: 0.8.2.0
>            Reporter: Jun Rao
>            Assignee: Jun Rao
>              Labels: newbie++
>             Fix For: 0.8.3
>
>         Attachments: KAFKA-1788.patch, KAFKA-1788_2015-01-06_13:42:37.patch, 
> KAFKA-1788_2015-01-06_13:44:41.patch
>
>
> In the new producer, when a partition has no leader for a long time (e.g., 
> all replicas are down), the records for that partition will stay in the 
> RecordAccumulator until the leader is available. This may cause the 
> bufferpool to be full and the callback for the produced message to block for 
> a long time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to