-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91012
-----------------------------------------------------------


Browsed through the patch, overall looks very promising.

I am not very clear on a few detailed changes though:

1. The request future adapter / handler modifications.
2. Retry backoff implementation seems not correct.

Could you explain a little bit on these two aspects?


clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
 (line 27)
<https://reviews.apache.org/r/36333/#comment144213>

    You may want to add the committed offset map in the callback since 
otherwise it is unclear which commit it is referring to when triggered.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(lines 542 - 545)
<https://reviews.apache.org/r/36333/#comment144216>

    Is there any particular reason you want to materialize the sessionTimeoutMs 
variable? It seems only referred once at line 545.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 765)
<https://reviews.apache.org/r/36333/#comment144219>

    I think KAFKA-1894 is already fixed in this patch + KAFKA-2168?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 786)
<https://reviews.apache.org/r/36333/#comment144233>

    Add some comments: "re-schedule the commit task for the next commit 
interval"?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (lines 88 - 93)
<https://reviews.apache.org/r/36333/#comment144252>

    This is not introduced in the patch, but I am not sure if this is the right 
way to respect backoff time. For example, if the destination broker is down for 
a short period of time, poll(retryBackoffMs) will immediately return, and hence 
this function will busy triggering poll() and fluding the network with metadata 
requests right?
    
    What we want in this case, is that the consumer should wait for 
retryBackoffMs before retry sending the next metadata request.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (line 119)
<https://reviews.apache.org/r/36333/#comment144250>

    This function can be private.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (lines 135 - 138)
<https://reviews.apache.org/r/36333/#comment144253>

    Same as above in awaitMetadataUpdate().



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 114)
<https://reviews.apache.org/r/36333/#comment144275>

    We can remove this line since it is checked inside ensureAssignment() 
already.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 137)
<https://reviews.apache.org/r/36333/#comment144239>

    Renaming to "ensurePartitionAssigned"?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 212)
<https://reviews.apache.org/r/36333/#comment144276>

    There is a potential risk of not aligning the scheduling of heartbeat with 
the discovery of the coordinator. For example, let's say:
    
    1. at t0 we call initHeartbeatTask with interval 100;
    2. at t1 the consumer already find the coordinator, but it will not send 
the first HB until t100;
    3. at t100 the consumer may find itself already been kicked out of the 
group by the coordinator, and reschedule at t200 and re-join group.
    4. at t101 the consumer has re-joined the group, but will not send the HB 
until t200, and so on ..



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 280)
<https://reviews.apache.org/r/36333/#comment144277>

    Why we change the behavior to directly throw exception here?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
 (line 22)
<https://reviews.apache.org/r/36333/#comment144246>

    This comment is not correct since the function returns void.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
 (lines 1 - 43)
<https://reviews.apache.org/r/36333/#comment144278>

    Not clear why you want to convert the future type in this adapter, can you 
elaborate a bit?


- Guozhang Wang


On July 8, 2015, 9:19 p.m., Jason Gustafson wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36333/
> -----------------------------------------------------------
> 
> (Updated July 8, 2015, 9:19 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2123
>     https://issues.apache.org/jira/browse/KAFKA-2123
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2123; resolve problems from rebase
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> fd98740bff175cc9d5bc02e365d88e011ef65d22 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 7aa076084c894bb8f47b9df2c086475b06f47060 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> 46e26a665a22625d50888efa7b53472279f36e79 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  695eaf63db9a5fa20dc2ca68957901462a96cd96 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  51eae1944d5c17cf838be57adf560bafe36fbfbd 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
>  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  683745304c671952ff566f23b5dd4cf3ab75377a 
>   
> clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 4c0ecc3badd99727b5bd9d430364e61c184e0923 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
>  PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  d085fe5c9e2a0567893508a1c71f014fae6d7510 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
>  PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  405efdc7a59438731cbc3630876bda0042a3adb3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ee1ede01efa070409b86f5d8874cd578e058ce51 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
>  PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> 476973b2c551db5be3f1c54f94990f0dd15ff65e 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 
> 
> Diff: https://reviews.apache.org/r/36333/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>

Reply via email to