> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1212
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212>
> >
> >     -1 makes the pollClient block forever. So, we don't get a chance to do 
> > the wakeup check.
> 
> Jason Gustafson wrote:
>     I might be wrong, but I think we can still use NetworkClient.wakeup to 
> interrupt a poll call which is waiting forever.
> 
> Jun Rao wrote:
>     Yes, you are right. I missed that.
> 
> Jun Rao wrote:
>     Actually, this seems to be still a problem. The issus is that when 
> NetworkClient.wakeup is made, NetworkClient.poll may not be called. In the 
> case, the next NetworkClient.poll may still block for the timeout.

>From the javadocs for Selector, the wakeup will apply to the next poll if one 
>is not in progress. But perhaps we should just check the wakeup flag before 
>entering the poll to be safe.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 797-798
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797>
> >
> >     Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
> > the end of the call, we expect the fetch offset to be set to the beginning. 
> > This is now changed to async, which doesn't match the intended behavior. We 
> > need to think through if this matters or not.
> >     
> >     Ditto for seekToEnd().
> 
> Jason Gustafson wrote:
>     Since we always update fetch positions before a new fetch and in 
> position(), it didn't seem necessary to make it synchronous. I thought this 
> handling might be more consistent with how new subscriptions are handled 
> (which are asynchronous and defer the initial offset fetch until the next 
> poll or position). That being said, I don't have a strong feeling about it, 
> so we could return to the blocking version.
> 
> Jun Rao wrote:
>     Making this async may be fine. One implication is that we call position() 
> immediately after seekToBeginning(), we may not be able to get the correct 
> offset.

We should be able to get the right offset since we always update offsets before 
returning the current position, but we might have to block for it. It's similar 
to if you call subscribe(topic) and then try to get its position immediately.


- Jason


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
-----------------------------------------------------------


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> -----------------------------------------------------------
> 
> (Updated June 5, 2015, 7:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
>     https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
> 
> Diff: https://reviews.apache.org/r/34789/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>

Reply via email to