> On May 4, 2015, 9:34 p.m., Guozhang Wang wrote:
> > Some general comments:
> > 
> > 1. This patch seems trying to fix multiple issues all at once, which makes 
> > it very hard to reason / review. Could you separate it into multiple phases 
> > with each phase resolving one issue at a time?
> > 
> > 2. It would be best to not leaking network information such as metadata 
> > timeout into record accumlator, which is solely used for batching data and 
> > checking if some batched data is ready to be sent (regardless of whether 
> > their leaders are available, ready, or some metadata needs refresh, etc).
> > 
> > 3. Some functions like partitionReady and batchReady are only triggered 
> > once in the code, and hence may actually be clearer to merge them into one 
> > place, or making the name to be more illustrative. For example batchReady 
> > returns the time for how long we should wait for this batch to be ready, 
> > while its name implicitly indicates returning a boolean.

Hi Guozhang, since many of the issues or changes in this patch will touch the 
same section, it might be more efficient to do them in one patch instead of 
change the same section again and again. I added the changes that I made to the 
rb description.
About the metadata timeout. I think the batch in the accumulator needs a 
timeout, whether it is metadata.timeout or some other timeout. But in our case, 
the batch is essentially timeout because metadata is not available. Instead of 
adding another timeout configuration, I feel it might be better to just reuse 
this metadata timeout. In another sense we are already exposing the metadata to 
accumulator when we drain() data or check data ready.
Good suggestion about the function names, just changed them.


> On May 4, 2015, 9:34 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 158
> > <https://reviews.apache.org/r/33552/diff/3/?file=948171#file948171line158>
> >
> >     Is this change intentional? If yes what is the rational behind?

My original thinking is that if something happened, we should wait until the 
metadata got updated instead of send further data. Talked with Guozhang offline 
and it looks this approach has a flaw when a metadata request was sent to a 
broker that is down. The be producer will wait for request.timeout without 
sending any further data. I'll revert this back.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33552/#review82444
-----------------------------------------------------------


On May 5, 2015, 12:19 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33552/
> -----------------------------------------------------------
> 
> (Updated May 5, 2015, 12:19 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2142
>     https://issues.apache.org/jira/browse/KAFKA-2142
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Somehow lost the comments when used kafka-patch-review tool... Adding them 
> back.
> 
> This patch tries to solve the following issues:
> 
> 1. Linger.ms is not honored - If partition A of node1 is not ready but 
> partiton B is ready, data of partiton A will also be sent out.
> 2. Retry backoff is not honord - A retry batch could wait for another 
> linger.ms instead of retry.backoff.ms
> 
> Changed the following behavior:
> 1. Drain the data then check when next time data will be ready.
> 2. If a metadata refresh is needed or in progress, send no further data.
> 3. Only refresh metadata if we have a partition with data to send but we 
> don't know its leader.
> 
> Added the following behavior:
> 1. Expire a batch if its leader is unknown for metadata timeout.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java 
> 07f1cdb1fe920b0c7a5f2d101ddc40c689e1b247 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 42b12928781463b56fc4a45d96bb4da2745b6d95 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  49a98838767615dd952da20825f6985698137710 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> b2db91ca14bbd17fef5ce85839679144fff3f689 
>   clients/src/test/java/org/apache/kafka/clients/MetadataTest.java 
> 928087d29deb80655ca83726c1ebc45d76468c1f 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 8b278892883e63899b53e15efb9d8c926131e858 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  baa48e7c1b7ac5da8f3aca29f653c3fff88f8009 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
>  8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
>   clients/src/test/resources/log4j.properties 
> b1d5b7f2b4091040bdcfb0a60fd58111179f45a0 
>   core/src/test/resources/log4j.properties 
> 1b7d5d8f7d5fae7d272849715714781cad05d77b 
> 
> Diff: https://reviews.apache.org/r/33552/diff/
> 
> 
> Testing
> -------
> 
> Unit Test passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to