[ https://issues.apache.org/jira/browse/KAFKA-170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14008399#comment-14008399 ]
Claude Mamo commented on KAFKA-170: ----------------------------------- I've been browsing for a non-blocking simple consumer library and I came across this issue. It so happens that I had modified the high-level consumer to use callbacks instead of blocking queues. I don't know if using non-blocking polling would be a better approach but here is a code example for those who are interested: {code:java} val partitionSize = 4 val topicCountMap = new util.HashMap[EventHandler[String, String], Integer]() val consumer = Consumer.create(...) val cb = (messageHolder: MessageAndMetadata[String, String]) => { println(messageHolder.message) } topicCountMap.put(new EventHandler("MyTopic", cb), partitionSize) consumer.createMessageStreams(topicCountMap, new StringDecoder(), new StringDecoder()) {code} The code is part of the Kafka Web Console project and it can be found here: https://github.com/claudemamo/kafka-web-console/tree/master/app/kafka. > Support for non-blocking polling on multiple streams > ---------------------------------------------------- > > Key: KAFKA-170 > URL: https://issues.apache.org/jira/browse/KAFKA-170 > Project: Kafka > Issue Type: Sub-task > Components: core > Affects Versions: 0.8.0 > Reporter: Jay Kreps > Priority: Critical > Labels: replication > > Currently we provide a blocking iterator in the consumer. This is a good > mechanism for consuming data from a single topic, but is limited as a > mechanism for polling multiple streams. > For example if one wants to implement a non-blocking union across multiple > streams this is hard to do because calls may block indefinitely. A similar > situation arrises if trying to implement a streaming join of between two > streams. > I would propose two changes: > 1. Implement a next(timeout) interface on KafkaMessageStream. This will > easily handle some simple cases with minimal change. This handles certain > limited cases nicely and is easy to implement, but doesn't actually cover the > two cases above. > 2. Add an interface to poll streams. > I don't know the best approach for the later api, but it is important to get > it right. One option would be to add a > ConsumerConnector.drainTopics("topic1", "topic2", ...) which blocks until > there is at least one message and then returns a list of triples (topic, > partition, message). -- This message was sent by Atlassian JIRA (v6.2#6252)