> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1212
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212>
> >
> >     -1 makes the pollClient block forever. So, we don't get a chance to do 
> > the wakeup check.
> 
> Jason Gustafson wrote:
>     I might be wrong, but I think we can still use NetworkClient.wakeup to 
> interrupt a poll call which is waiting forever.

Yes, you are right. I missed that.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 797-798
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797>
> >
> >     Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
> > the end of the call, we expect the fetch offset to be set to the beginning. 
> > This is now changed to async, which doesn't match the intended behavior. We 
> > need to think through if this matters or not.
> >     
> >     Ditto for seekToEnd().
> 
> Jason Gustafson wrote:
>     Since we always update fetch positions before a new fetch and in 
> position(), it didn't seem necessary to make it synchronous. I thought this 
> handling might be more consistent with how new subscriptions are handled 
> (which are asynchronous and defer the initial offset fetch until the next 
> poll or position). That being said, I don't have a strong feeling about it, 
> so we could return to the blocking version.

Making this async may be fine. One implication is that we call position() 
immediately after seekToBeginning(), we may not be able to get the correct 
offset.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
-----------------------------------------------------------


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> -----------------------------------------------------------
> 
> (Updated June 5, 2015, 7:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
>     https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
> 
> Diff: https://reviews.apache.org/r/34789/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>

Reply via email to