-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91016
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (line 91)
<https://reviews.apache.org/r/36333/#comment144254>

    Is the retryBackoffMs here because that's how long you want to wait for a 
response before we jump back out of the poll, which fails the request and 
triggers another NetworkClient.poll() call, which in turn sends the 
MetadataRequest?
    
    Just trying to figure out the flow because a) the flow is unclear and b) 
this is the only use of metadata in this class. Would this make sense to push 
into NetworkClient, which is the one responsible for making the request anyway? 
I'm not sure it's a good idea since nothing besides this class uses 
NetworkClient anymore, but worth thinking about.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (line 121)
<https://reviews.apache.org/r/36333/#comment144258>

    Is this the behavior we want? Both timeout and delayedTasks.nextTimeout() 
can be arbitrarily small. For delayedTasks especially, it seems like we're 
tying the failure of requests to unrelated events?
    
    Previously, I thought request failures may happen quickly, but then there 
was a Utils.sleep backoff. I am not seeing how that is handled now? If the 
connection isn't established, won't poll() not send out requests, run 
client.poll(), possibly return very quickly before the connection is 
established/fails, then fail the unsent requests, then just return to the 
caller? And that caller might be one of those while() loops, so it may just 
continue to retry, busy looping while not actually accomplishing anything.
    
    If we're going to introduce queuing of send requests here, it seems like a 
timeout (rather than fast fail + backoff) might be a more natural solution. So 
rather than clearUnsentRequests, it might be clearExpiredRequests.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 113)
<https://reviews.apache.org/r/36333/#comment144273>

    I think this is a hold over from previous code, so feel free to defer to 
another JIRA. The code in KafkaConsumer.committed() implies it should work for 
partitions not assigned to this consumer (and that is useful functionality we 
should probably support). But here, the argument is ignored in the condition 
for the while() loop, which seems error prone. I don't see a way that 
comitted() would cause subscriptions.refreshCommitsNeeded to return true, so I 
think it'll currently fail for unassigned partitions.
    
    For this code, I think it's risky not to specifically be checking the 
requested partitions against the available set in subscriptions unless the 
caller is already guaranteed to do that.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 137)
<https://reviews.apache.org/r/36333/#comment144272>

    This being separated from the rebalance callbacks concerns me a little, 
specifically because we're adding user callbacks and those can potentially make 
additional calls on the consumer. I scanned through all the usages and can 
currently only find one weird edge case that could create incorrect behavior:
    
    1. Do async offset commit
    2. During normal poll(), offset commit finishes and callback is invoked.
    3. Callback changes calls committed(TopicPartition), which invokes 
refreshCommittedOffsets, which invokes ensureAssignment and handles the 
rebalance without invoking callbacks.
    
    Actually, I think based on my earlier comment this won't currently happen, 
but only because I think committed() won't work properly with non-assigned 
partitions.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 298)
<https://reviews.apache.org/r/36333/#comment144274>

    Why trigger commit refresh here?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 526)
<https://reviews.apache.org/r/36333/#comment144281>

    Stray unnecessary "Coordinator." prefix.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 640)
<https://reviews.apache.org/r/36333/#comment144280>

    Is this really the what should be happening? I'm trying to figure out how 
we can even reach this code since RequestFutureCompletionHandler only has 
onComplete -> RequestFuture.complete(), which indicates success.
    
    This does need to handle both cases, but can we validly get to onFailure 
the way this class is used, or should this be some stronger assertion?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
 (line 74)
<https://reviews.apache.org/r/36333/#comment144226>

    Since we bumped to Java 1.7 you can use Long.compare(this.timeout, 
entry.timeout).



clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
 (line 22)
<https://reviews.apache.org/r/36333/#comment144283>

    Related to previous comment about immediately failing sends vs. timeouts in 
ConsumerNetworkClient -- with the latter approach, we don't need a separate 
exception here, we could just use TimeoutException.
    
    Also, are these new exceptions that are in o.a.k.clients.consumer.internals 
guaranteed not to be thrown and escape the consumer? And if so, is that just 
because they inherit from RetriableException?



clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 
<https://reviews.apache.org/r/36333/#comment144308>

    Why did this last assertion for the assigned partition get dropped?



clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 (line 302)
<https://reviews.apache.org/r/36333/#comment144309>

    comment should say disconnected, not 'not coordinator'



clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 (line 357)
<https://reviews.apache.org/r/36333/#comment144310>

    I think there are a few other scenarios not tested here (or in 
ConsumerTest). For example, a non-RetriableException should throw if there's no 
callback, set cb.exception if there is a callback.


- Ewen Cheslack-Postava


On July 8, 2015, 9:19 p.m., Jason Gustafson wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36333/
> -----------------------------------------------------------
> 
> (Updated July 8, 2015, 9:19 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2123
>     https://issues.apache.org/jira/browse/KAFKA-2123
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2123; resolve problems from rebase
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> fd98740bff175cc9d5bc02e365d88e011ef65d22 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 7aa076084c894bb8f47b9df2c086475b06f47060 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> 46e26a665a22625d50888efa7b53472279f36e79 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  695eaf63db9a5fa20dc2ca68957901462a96cd96 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  51eae1944d5c17cf838be57adf560bafe36fbfbd 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
>  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  683745304c671952ff566f23b5dd4cf3ab75377a 
>   
> clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 4c0ecc3badd99727b5bd9d430364e61c184e0923 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
>  PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  d085fe5c9e2a0567893508a1c71f014fae6d7510 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
>  PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  405efdc7a59438731cbc3630876bda0042a3adb3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ee1ede01efa070409b86f5d8874cd578e058ce51 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
>  PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> 476973b2c551db5be3f1c54f94990f0dd15ff65e 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 
> 
> Diff: https://reviews.apache.org/r/36333/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>

Reply via email to