-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
-----------------------------------------------------------


Thanks for the patch. A few comments below.


clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment139565>

    Could we add an example of how to use the new wakeup() call, especially 
with closing the consumer properly? For example, does the consumer thread just 
catch the ConsumerWakeupException and then call close()?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment139574>

    Hmm, seekToBegining() is supposed to be a blocking call. Basically, at the 
end of the call, we expect the fetch offset to be set to the beginning. This is 
now changed to async, which doesn't match the intended behavior. We need to 
think through if this matters or not.
    
    Ditto for seekToEnd().



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment139577>

    Should we pass in tp to isOffsetResetNeeded()?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment139509>

    The returned response may be ready already after the offsetBefore call due 
to needing metadata refresh. Since we don't check the ready state immediately 
afterward, we may be delaying the processing of metadata refresh by the request 
timeout.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment139564>

    Currently, our coding convention is not to wrap single line statement with 
{}. There are a few other cases like this.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment139544>

    In the async mode, response may not be ready in the first iteration. Are we 
handling the retry properly in that case?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment139545>

    We probably need to make some changes here when KAFKA-2120 is done to 
handle the request timeout propertly. Perhaps we can add a TODO comment here.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment139563>

    -1 makes the pollClient block forever. So, we don't get a chance to do the 
wakeup check.



clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
<https://reviews.apache.org/r/34789/#comment139588>

    Do we need NONE?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
<https://reviews.apache.org/r/34789/#comment139579>

    We probably can make it clear that this is a non-blocking call and doesn't 
wait for the request to be sent or the response to be received. It would be 
good to do that on other similar methods too.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
<https://reviews.apache.org/r/34789/#comment139537>

    Should asynchronous by synchronous?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
<https://reviews.apache.org/r/34789/#comment139586>

    Similar as the above, it would be useful to make it clear that 
fetchOffsets() is non-blocking.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
<https://reviews.apache.org/r/34789/#comment139534>

    Perhaps we can define a static method to initalize the constant and set the 
state. It's clearer that way since the instantiation and the initialization are 
in the same place. With this, we probably don't need the static getter methods 
and can just let the caller use the static constants directly.
    
    Ditto for BrokerResult.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
<https://reviews.apache.org/r/34789/#comment139540>

    The comment seems inaccurate. We are not returning an error, but returning 
a remedy instead.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
<https://reviews.apache.org/r/34789/#comment139504>

    Do we need both hasRemedy and hasException? It seems that if one returns 
true, the other should always return false.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
<https://reviews.apache.org/r/34789/#comment139578>

    To be consistent with the naming convention with the rest of the methods, 
should we just name it offsetRestNeeded()?


- Jun Rao


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> -----------------------------------------------------------
> 
> (Updated June 5, 2015, 7:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
>     https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
> 
> Diff: https://reviews.apache.org/r/34789/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>

Reply via email to