----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review86338 -----------------------------------------------------------
This looks good so far. I think it's much easier to understand when all the blocking stuff happens at the KafkaConsumer level and each of the classes it uses only ever handles single requests. It'd be nice to document the basic architecture somewhere since it took me a bit to fully figure it out. (Unfortunately, since the javadocs for the consumer are in the implementation class KafkaConsumer instead of on the Consumer interface, we can't put this with the KafkaConsumer class...) Some notes in addition to the inline stuff: Some functionality has been pulled back up to KafkaConsumer, in a mild reversal of Guozhang's refactoring. It'd be nice to keep this to a minimum. The ones that stuck out to me were resetOffsets()/resetOffset()/offsetBefore(). I'm guessing you also couldn't figure out a way to keep it in Fetcher since the inner call to offsetBefore() requires that blocking loop? Some handling of DelayedResponses and its subclasses seem redundant/follows a common pattern and maybe could be refactored into utility code. However, there are few enough places it's happening now that I don't think it's a big deal. It does seem a bit wasteful that we have to continually create these DelayedResponse objects even in cases where we know we'll fail fast, but I suppose those cases should be unusual and the cost to allocate them isn't all that high. Finally, a readability/cleanliness thing. This patch adds more nested anonymous RequestCompletionHandler classes. I think these are fine as they are, but if the implementations get too long or branchy with all the various error conditions they can become unreadably over-indented. Taking some of the big ones and using named nested classes might help improve clarity, although it does separate the request initiating code from the response handling code. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/34789/#comment138296> This seems like it'll be analogous to KIP-19's request.timeout.ms since it looks like all the use cases are sending a single request and waiting for a single response. I don't think there's any patch ready for that yet (check w/ the JIRA and maybe Jiangjie has something that hasn't been submitted yet), but if that ends up accepted we could potentially add that flag in either patch. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/34789/#comment138299> Might want to move this to trace. Normal consumers are going to hit this *a lot* clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/34789/#comment138304> I think this is simpler and clearer if you reorder these two -- reset the offsets that need it first, then update fetch positions for ones that are missing it. I think this removes the extra conditional !subscriptions.offsetResetNeeded(tp) that was added in updateFetchPositions too. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/34789/#comment138305> Any reason not to do this? Definitely seems like it'd be necessary although the logic might be more complicated than just moving maybeHearbeat() in here -- some of the other lead up to this could change if you saw a partition reassignment in the middle of a long poll, requiring fetch positions to be updated, etc. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/34789/#comment138320> Besides grouping by node, we could also send these requests out in parallel. The drawback to simplifying this all to use a series off offsetBefore() calls is that each blocks, so resetting a bunch of offsets is going to be pretty slow. Obvious solution is a utility that lets you run a bunch of requests in parallel, then do the same looping you're doing waiting for a single response but handle a bunch all at once. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/34789/#comment138324> Since poll() can trigger auto offset commits, and then the commits can block while polling() for some time, can we end up recursing in some bad situations, e.g. if we consistently cannot get a coordinator? We might need to keep track if a commit is outstanding and not try to commit again, or just update the values we're trying to commit. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/34789/#comment138323> In async mode, why bother polling at all here? You mentioned coordinator connection issues in the comment below, but if we need to handle that here, aren't we effectively making this a synchronous request? Or semi-synchronous since it has to wait for the coordinator? Or is this best left to a follow up since there were other issues with async since we need callbacks and there were other ordering issues in the implementation? clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java <https://reviews.apache.org/r/34789/#comment138326> The classes named XResponse may be a bit confusing because the protocol responses use that terminology. Future? Result? clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java <https://reviews.apache.org/r/34789/#comment138327> Also, probably worth adding some javadocs to explain the how these classes work -- similar to future, but one potential request may or may not be made, and the first issue it encounters causes the Delayed/Broker/CoordinatorResponse to become ready with some sort of result. That result could be one of a small set of issues for Broker/CoordinatorResponses or is the value obtained from a valid response from the broker. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java <https://reviews.apache.org/r/34789/#comment138294> You can get rid of this -- metadata is no longer used in this class. Nice work, that should help make the layering of these classes cleaner. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java <https://reviews.apache.org/r/34789/#comment138331> Seems like blockingCoordinatorRequest/initiateCoordinatorRequest logic had to be expanded. This same basic code seems to be duplicated in multiple places now. Any way to avoid the duplication? I see that there's some code now in the middle of those blocks that differ (the code that used to come after the blocking request finished), but these are fairly large chunks of code. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java <https://reviews.apache.org/r/34789/#comment138333> The second half of this description no longer matches since there is no blocking commit here anymore. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java <https://reviews.apache.org/r/34789/#comment138337> time.milliseconds() is called 3x here, looks like you should be able to call once and use the same value. clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java <https://reviews.apache.org/r/34789/#comment138339> If we need this enum across multiple classes, maybe it should be its own standalone type instead of nested in one of them. clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java <https://reviews.apache.org/r/34789/#comment138301> This naming threw me off when I was looking at callers, at first I thought offsetResetNeeded() was checking the value and offsetResetNeeded(TopicPartition) was setting a flag that the TopicPartition needed offsets reset. Maybe rename these to isOffsetResetNeeded to clarify, leaving needOffsetReset() to set the flags? - Ewen Cheslack-Postava On June 3, 2015, 12:10 a.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34789/ > ----------------------------------------------------------- > > (Updated June 3, 2015, 12:10 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2168 > https://issues.apache.org/jira/browse/KAFKA-2168 > > > Repository: kafka > > > Description > ------- > > KAFKA-2168; refactored callback handling to prevent unnecessary requests > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > 8f587bc0705b65b3ef37c86e0c25bb43ab8803de > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > 1ca75f83d3667f7d01da1ae2fd9488fb79562364 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d301be4709f7b112e1f3a39f3c04cfa65f00fa60 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > f50da825756938c193d7f07bee953e000e2627d9 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > b2764df11afa7a99fce46d1ff48960d889032d14 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResponse.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > ef9dd5238fbc771496029866ece1d85db6d7b7a5 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > cee75410127dd1b86c1156563003216d93a086b3 > > clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java > 677edd385f35d4262342b567262c0b874876d25b > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 419541011d652becf0cda7a5e62ce813cddb1732 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java > e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 > > Diff: https://reviews.apache.org/r/34789/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >