> On April 21, 2015, 11:56 p.m., Guozhang Wang wrote: > > This piece of logic has been quite complex and awkward to me now, for > > example in ready() a node will only not be considered if ALL of its > > partitions are either not sendable or are in the backoff period, and the > > reason we want to get ready nodes before drain is to check if they are > > "really" ready or not. This is mainly because 1) we need to be careful when > > calling client.poll() later about the timeout value in order to avoid busy > > waiting, 2) we need to make sure if metadata refresh is needed, it needs to > > be sent as higher priority than other requests. > > > > I suggest re-writing this fraction of code to make it clearer, in the > > following process: > > > > 0. while handle metadata response and update the metadata, check for ANY > > partitions if their leader is not known; if there is set > > metadata.requestUpdate. So we do not need to do this step anymore at the > > start of run(). > > > > 1. get all the ready nodes based on their connection state only (i.e. no > > peeking in RecordAccumulator), and record the node_backoff as min > > (reconnection_backoff - time_waited) of all nodes; if one of these node is > > connected or connecting, this backoff should be 0. > > > > 2. for each of ready nodes, try to drain their corresponding partitions in > > RecordAccumulator while considering or kinds of conditions (full, expired, > > exhausted, etc...), and record the data_backoff as min (retry_backoff - > > time_waited) of all partitions; if one of the partitions is immediately > > sendable, this backoff should be 0. > > > > 3. formulate produce request and call client.poll() with timeout = > > reconnection_backoff > 0 ? recconection_backoff : retry_backoff. > > > > 4. in NetworkClient.poll(), the logic of "maybeUpdateMetadata" while update > > metadataTimeout can also be simplified. > > > > This may contain some flaw, Jiangjie / Ewen let me know if you see any > > issues. > > Jiangjie Qin wrote: > Hi Guozhang, I think that makes sense. We should exchange the checking > order to check connection ready first then the data ready. I'll try to submit > a refactored patch and will throw questions if there is anything. Thanks.
I agree with the suggestions to rewrite but I really think that is orthogonal to this simple (and correct) fix. I think we should commit this and keep the jira open for further refactoring. - Joel ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33417/#review81097 ----------------------------------------------------------- On April 21, 2015, 10:51 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33417/ > ----------------------------------------------------------- > > (Updated April 21, 2015, 10:51 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2138 > https://issues.apache.org/jira/browse/KAFKA-2138 > > > Repository: kafka > > > Description > ------- > > Patch for KAFKA-2138 honor retry backoff in KafkaProducer > > > Diffs > ----- > > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 > > clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java > 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c > > Diff: https://reviews.apache.org/r/33417/diff/ > > > Testing > ------- > > > Thanks, > > Jiangjie Qin > >