[ 
https://issues.apache.org/jira/browse/KAFKA-1780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14214897#comment-14214897
 ] 

Ewen Cheslack-Postava commented on KAFKA-1780:
----------------------------------------------

Patch attached adds a NonBlocingIteratorTemplate subclass for IteratorTemplate 
and makes ConsumerIterator implement it. A few notes:

* I moved peek() and added poll() to a subclass of IteratorTemplate because not 
all implementations of IteratorTemplate can implement them correctly.
* The logic to figure out timeouts and throw ConsumerTimeoutExceptions is now a 
bit confusing because we have 2 ways of setting timeouts. Would have been nicer 
to just do the peek()/poll() in the first place.
* Adjusted the constraints on the IteratorTemplate/NonBlockingIteratorTemplate 
item type. The code already assumed nullable types, but didn't enforce it. In 
fact, the IteratorTemplateTest used a non-nullable type and just happens to 
work ok. As far as I can tell, adding this restriction has no negative effects. 
Not adding it and using null.asInstanceOf[T] where necessary permits invalid 
iterator implementations.
* Removed peek() from IteratorTemplate. It wasn't implemented correctly anyway 
and isn't used anywhere in the Kafka code. However, it is a public interface, 
so I'm not sure if we should actually remove it.

> Add peek()/poll() for ConsumerIterator
> --------------------------------------
>
>                 Key: KAFKA-1780
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1780
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.1.1
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Ewen Cheslack-Postava
>         Attachments: KAFKA-1780.patch
>
>
> Currently, all consumer operations (next(), haveNext()) block. This is 
> problematic for a couple of use cases. Most obviously, a peek() method would 
> be nice so you can at least check whether any data is immediately available, 
> getting a null value back if it's not.
> A more difficult example is a proxy with a timeout, i.e. it consumes messages 
> for up to N ms or M messages, and returns whatever it has at the end of that 
> period. It's possible to approximate that with peek, but requires aggressive 
> polling to match the proxy's timeout. A poll(timeout) method would allow for 
> a correct implementation, where each call to poll gets a single message, but 
> also allows the user to specify a custom timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to