[ 
https://issues.apache.org/jira/browse/KAFKA-170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14008399#comment-14008399
 ] 

Claude Mamo commented on KAFKA-170:
-----------------------------------

I've been browsing for a non-blocking simple consumer library and I came across 
this issue. It so happens that I had modified the high-level consumer to use 
callbacks instead of blocking queues. I don't know if using non-blocking 
polling would be a better approach but here is a code example for those who are 
interested: 

{code:java}
val partitionSize = 4
val topicCountMap = new util.HashMap[EventHandler[String, String], Integer]()
val consumer = Consumer.create(...)
val cb = (messageHolder: MessageAndMetadata[String, String]) => {
   println(messageHolder.message)
}

topicCountMap.put(new EventHandler("MyTopic", cb), partitionSize)
consumer.createMessageStreams(topicCountMap, new StringDecoder(), new 
StringDecoder())
{code}

The code is part of the Kafka Web Console project and it can be found here: 
https://github.com/claudemamo/kafka-web-console/tree/master/app/kafka.

> Support for non-blocking polling on multiple streams
> ----------------------------------------------------
>
>                 Key: KAFKA-170
>                 URL: https://issues.apache.org/jira/browse/KAFKA-170
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8.0
>            Reporter: Jay Kreps
>            Priority: Critical
>              Labels: replication
>
> Currently we provide a blocking iterator in the consumer. This is a good 
> mechanism for consuming data from a single topic, but is limited as a 
> mechanism for polling multiple streams.
> For example if one wants to implement a non-blocking union across multiple 
> streams this is hard to do because calls may block indefinitely. A similar 
> situation arrises if trying to implement a streaming join of between two 
> streams.
> I would propose two changes:
> 1. Implement a next(timeout) interface on KafkaMessageStream. This will 
> easily handle some simple cases with minimal change. This handles certain 
> limited cases nicely and is easy to implement, but doesn't actually cover the 
> two cases above.
> 2. Add an interface to poll streams.
> I don't know the best approach for the later api, but it is important to get 
> it right. One option would be to add a 
> ConsumerConnector.drainTopics("topic1", "topic2", ...) which blocks until 
> there is at least one message and then returns a list of triples (topic, 
> partition, message).



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to