[ 
https://issues.apache.org/jira/browse/KAFKA-520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13601190#comment-13601190
 ] 

David Arthur commented on KAFKA-520:
------------------------------------

This has bitten me as well. Under the covers ConsumerIterator is polling a 
BlockingQueue anyways, so maybe it does make sense to expose this API to users.
                
> ConsumerIterator implemented by KafkaStream doesn't follow Java practices
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-520
>                 URL: https://issues.apache.org/jira/browse/KAFKA-520
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.7, 0.7.1
>            Reporter: Esko Suomi
>
> As a foreword, this only applies to Java conventions - if things are 
> different on the Scala side, then so be it and that's fine.
> As mentioned in the summary, ConsumerIterator doesn't follow proper Java 
> practices, to be exact it doesn't follow them in its functionality. The 
> biggest offender is the #hasNext() method which blocks until 
> ConsumerTimeoutException is thrown. While it is obvious that this is because 
> the targeted use-case is infinite consuming of a given topic, it did confuse 
> me as an API integration programmer since the documentation was severely 
> lacking and I only started to observe this problem in our staging environment.
> There are multiple ways that I find appropriate to fix this:
> - Instead of implementing java.util.Iterator, make the class an 
> implementation of BlockingQueue. Since BlockingQueue is in the 
> java.util.concurrent package, it should nudge the user's mind to correct 
> tracks about the class' semantics immediately.
> - Get rid of the concept of internal infinite iteration and instead make the 
> Iterator represent one fetched block of data; that way the infinite loop for 
> consuming can be something like
> while (!Thread.interrupted) {
>     Iterator it = ks.readMore(...);
>     while (iterator.hasNext()) {
>         /* consume messages */
>     }
> }
> In addition to clearer Java API, this also gets rid of the exception being 
> used for flow control which, once again, doesn't fit to Java best practices.
> - Update the documentation (both API and quickstart) to explain how to 
> recover from such failure.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to