> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 1091 > > <https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line1091> > > > > Since poll() can trigger auto offset commits, and then the commits can > > block while polling() for some time, can we end up recursing in some bad > > situations, e.g. if we consistently cannot get a coordinator? > > > > We might need to keep track if a commit is outstanding and not try to > > commit again, or just update the values we're trying to commit.
To clarify the issue: in poll(timeout), we check if it is time to autocommit and call commit, which then calls commitOffsets. In commitOffsets, we have a while(true) loop and in it we poll for both sync and async. If that polling process takes long enough, then we could hit the next interval and those poll() calls could trigger another call to commit. Now we have 2 calls to commit on the stack. I don't think this is likely, and I'm not certain there's a condition where you can get stuck in the loop that long when using async commits. But since the logic in commitOffsets current checks response.isReady first, *then* breaks if its async, I thought it might be possible that during a connectivity issue with the coordinator, you might just get stuck in this loop even in async commit mode and trigger this recursive commit behavior (given a long enough outage/short enough auto commit interval). > On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java, > > line 15 > > <https://reviews.apache.org/r/34789/diff/3/?file=976967#file976967line15> > > > > The classes named XResponse may be a bit confusing because the protocol > > responses use that terminology. Future? Result? > > Jason Gustafson wrote: > Agreed. In fact, they were XResult initially. I changed them because > BrokerResult and CoordinatorResult didn't seems to suggest as clearly what > they were for as BrokerResponse and CoordinatorResponse. I considered Future > as well, but its usage is a bit different than traditional Java Futures. > Perhaps XReply? Even though there's no blocking get(), XFuture might be the clearest. XReply would work, but has a similar issue that it gets confusing whether XResponse or XReply is the actual message received back vs. the processed data that you wanted to extract. - Ewen ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review86338 ----------------------------------------------------------- On June 4, 2015, 4:07 a.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34789/ > ----------------------------------------------------------- > > (Updated June 4, 2015, 4:07 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2168 > https://issues.apache.org/jira/browse/KAFKA-2168 > > > Repository: kafka > > > Description > ------- > > KAFKA-2168; refactored callback handling to prevent unnecessary requests > > > KAFKA-2168; address review comments > > > KAFKA-2186; fix rebase error and checkstyle issue > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > 8f587bc0705b65b3ef37c86e0c25bb43ab8803de > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > 1ca75f83d3667f7d01da1ae2fd9488fb79562364 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d301be4709f7b112e1f3a39f3c04cfa65f00fa60 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > f50da825756938c193d7f07bee953e000e2627d9 > > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > fac79951d50ef6f19cef5fe62cbc4582b27b145a > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > c5e577ff98bea3de65e290d30065935a29b3247f > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > cee75410127dd1b86c1156563003216d93a086b3 > > clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java > 677edd385f35d4262342b567262c0b874876d25b > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 419541011d652becf0cda7a5e62ce813cddb1732 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java > e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 > > Diff: https://reviews.apache.org/r/34789/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >