[ https://issues.apache.org/jira/browse/KAFKA-1780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14214897#comment-14214897 ]
Ewen Cheslack-Postava commented on KAFKA-1780: ---------------------------------------------- Patch attached adds a NonBlocingIteratorTemplate subclass for IteratorTemplate and makes ConsumerIterator implement it. A few notes: * I moved peek() and added poll() to a subclass of IteratorTemplate because not all implementations of IteratorTemplate can implement them correctly. * The logic to figure out timeouts and throw ConsumerTimeoutExceptions is now a bit confusing because we have 2 ways of setting timeouts. Would have been nicer to just do the peek()/poll() in the first place. * Adjusted the constraints on the IteratorTemplate/NonBlockingIteratorTemplate item type. The code already assumed nullable types, but didn't enforce it. In fact, the IteratorTemplateTest used a non-nullable type and just happens to work ok. As far as I can tell, adding this restriction has no negative effects. Not adding it and using null.asInstanceOf[T] where necessary permits invalid iterator implementations. * Removed peek() from IteratorTemplate. It wasn't implemented correctly anyway and isn't used anywhere in the Kafka code. However, it is a public interface, so I'm not sure if we should actually remove it. > Add peek()/poll() for ConsumerIterator > -------------------------------------- > > Key: KAFKA-1780 > URL: https://issues.apache.org/jira/browse/KAFKA-1780 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8.1.1 > Reporter: Ewen Cheslack-Postava > Assignee: Ewen Cheslack-Postava > Attachments: KAFKA-1780.patch > > > Currently, all consumer operations (next(), haveNext()) block. This is > problematic for a couple of use cases. Most obviously, a peek() method would > be nice so you can at least check whether any data is immediately available, > getting a null value back if it's not. > A more difficult example is a proxy with a timeout, i.e. it consumes messages > for up to N ms or M messages, and returns whatever it has at the end of that > period. It's possible to approximate that with peek, but requires aggressive > polling to match the proxy's timeout. A poll(timeout) method would allow for > a correct implementation, where each call to poll gets a single message, but > also allows the user to specify a custom timeout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)