----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91016 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (line 91) <https://reviews.apache.org/r/36333/#comment144254> Is the retryBackoffMs here because that's how long you want to wait for a response before we jump back out of the poll, which fails the request and triggers another NetworkClient.poll() call, which in turn sends the MetadataRequest? Just trying to figure out the flow because a) the flow is unclear and b) this is the only use of metadata in this class. Would this make sense to push into NetworkClient, which is the one responsible for making the request anyway? I'm not sure it's a good idea since nothing besides this class uses NetworkClient anymore, but worth thinking about. clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (line 121) <https://reviews.apache.org/r/36333/#comment144258> Is this the behavior we want? Both timeout and delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks especially, it seems like we're tying the failure of requests to unrelated events? Previously, I thought request failures may happen quickly, but then there was a Utils.sleep backoff. I am not seeing how that is handled now? If the connection isn't established, won't poll() not send out requests, run client.poll(), possibly return very quickly before the connection is established/fails, then fail the unsent requests, then just return to the caller? And that caller might be one of those while() loops, so it may just continue to retry, busy looping while not actually accomplishing anything. If we're going to introduce queuing of send requests here, it seems like a timeout (rather than fast fail + backoff) might be a more natural solution. So rather than clearUnsentRequests, it might be clearExpiredRequests. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 113) <https://reviews.apache.org/r/36333/#comment144273> I think this is a hold over from previous code, so feel free to defer to another JIRA. The code in KafkaConsumer.committed() implies it should work for partitions not assigned to this consumer (and that is useful functionality we should probably support). But here, the argument is ignored in the condition for the while() loop, which seems error prone. I don't see a way that comitted() would cause subscriptions.refreshCommitsNeeded to return true, so I think it'll currently fail for unassigned partitions. For this code, I think it's risky not to specifically be checking the requested partitions against the available set in subscriptions unless the caller is already guaranteed to do that. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 137) <https://reviews.apache.org/r/36333/#comment144272> This being separated from the rebalance callbacks concerns me a little, specifically because we're adding user callbacks and those can potentially make additional calls on the consumer. I scanned through all the usages and can currently only find one weird edge case that could create incorrect behavior: 1. Do async offset commit 2. During normal poll(), offset commit finishes and callback is invoked. 3. Callback changes calls committed(TopicPartition), which invokes refreshCommittedOffsets, which invokes ensureAssignment and handles the rebalance without invoking callbacks. Actually, I think based on my earlier comment this won't currently happen, but only because I think committed() won't work properly with non-assigned partitions. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 298) <https://reviews.apache.org/r/36333/#comment144274> Why trigger commit refresh here? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 526) <https://reviews.apache.org/r/36333/#comment144281> Stray unnecessary "Coordinator." prefix. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 640) <https://reviews.apache.org/r/36333/#comment144280> Is this really the what should be happening? I'm trying to figure out how we can even reach this code since RequestFutureCompletionHandler only has onComplete -> RequestFuture.complete(), which indicates success. This does need to handle both cases, but can we validly get to onFailure the way this class is used, or should this be some stronger assertion? clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java (line 74) <https://reviews.apache.org/r/36333/#comment144226> Since we bumped to Java 1.7 you can use Long.compare(this.timeout, entry.timeout). clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java (line 22) <https://reviews.apache.org/r/36333/#comment144283> Related to previous comment about immediately failing sends vs. timeouts in ConsumerNetworkClient -- with the latter approach, we don't need a separate exception here, we could just use TimeoutException. Also, are these new exceptions that are in o.a.k.clients.consumer.internals guaranteed not to be thrown and escape the consumer? And if so, is that just because they inherit from RetriableException? clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java <https://reviews.apache.org/r/36333/#comment144308> Why did this last assertion for the assigned partition get dropped? clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java (line 302) <https://reviews.apache.org/r/36333/#comment144309> comment should say disconnected, not 'not coordinator' clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java (line 357) <https://reviews.apache.org/r/36333/#comment144310> I think there are a few other scenarios not tested here (or in ConsumerTest). For example, a non-RetriableException should throw if there's no callback, set cb.exception if there is a callback. - Ewen Cheslack-Postava On July 8, 2015, 9:19 p.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/36333/ > ----------------------------------------------------------- > > (Updated July 8, 2015, 9:19 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2123 > https://issues.apache.org/jira/browse/KAFKA-2123 > > > Repository: kafka > > > Description > ------- > > KAFKA-2123; resolve problems from rebase > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > fd98740bff175cc9d5bc02e365d88e011ef65d22 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > 7aa076084c894bb8f47b9df2c086475b06f47060 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > 46e26a665a22625d50888efa7b53472279f36e79 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > c1c8172cd45f6715262f9a6f497a7b1797a834a3 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > 695eaf63db9a5fa20dc2ca68957901462a96cd96 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java > 51eae1944d5c17cf838be57adf560bafe36fbfbd > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java > 13fc9af7392b4ade958daf3b0c9a165ddda351a6 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > 683745304c671952ff566f23b5dd4cf3ab75377a > > clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java > 4c0ecc3badd99727b5bd9d430364e61c184e0923 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > d085fe5c9e2a0567893508a1c71f014fae6d7510 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 405efdc7a59438731cbc3630876bda0042a3adb3 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java > ee1ede01efa070409b86f5d8874cd578e058ce51 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java > PRE-CREATION > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala > 476973b2c551db5be3f1c54f94990f0dd15ff65e > core/src/test/scala/integration/kafka/api/ConsumerTest.scala > 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 > > Diff: https://reviews.apache.org/r/36333/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >