[ https://issues.apache.org/jira/browse/KAFKA-2142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14512637#comment-14512637 ]
Jiangjie Qin commented on KAFKA-2142: ------------------------------------- Created reviewboard https://reviews.apache.org/r/33552/diff/ against branch origin/trunk > Follow-up patch for KAFKA-2138 Refactor the drain message logic in new > producer > ------------------------------------------------------------------------------- > > Key: KAFKA-2142 > URL: https://issues.apache.org/jira/browse/KAFKA-2142 > Project: Kafka > Issue Type: Bug > Reporter: Jiangjie Qin > Assignee: Jiangjie Qin > Attachments: KAFKA-2142.patch > > > This is the follow up patch for KAFKA-2138. Currently the logic for sender to > drain message from accumulator is a little bit awkward, we want to refactor > it a little bit. Copy/Paste Guozhang's suggestion below: > {quote} > 1. while handle metadata response and update the metadata, check for ANY > partitions if their leader is not known; if there is set > metadata.requestUpdate. So we do not need to do this step anymore at the > start of run(). > 2. get all the ready nodes based on their connection state only (i.e. no > peeking in RecordAccumulator), and record the node_backoff as min > (reconnection_backoff - time_waited) of all nodes; if one of these node is > connected or connecting, this backoff should be 0. > 3. for each of ready nodes, try to drain their corresponding partitions > in RecordAccumulator while considering or kinds of conditions (full, expired, > exhausted, etc...), and record the data_backoff as min (retry_backoff - > time_waited) of all partitions; if one of the partitions is immediately > sendable, this backoff should be 0. > 4. formulate produce request and call client.poll() with timeout = > reconnection_backoff > 0 ? recconection_backoff : retry_backoff. > 5. in NetworkClient.poll(), the logic of "maybeUpdateMetadata" while > update metadataTimeout can also be simplified. > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)