Hi Jun, Thanks for following up. I removed the statement but still see no messages from the producer. Also when that statement is in with the single threaded consumer example, it prints "non-empty iterator" for its toString method versus "empty iterator" in the non working multi stream example.
Here is the code. When this is running in a loop, I have been sending messages via the console producer script. AppCongig.java -------------------------------------------------------------- import javax.inject.Named; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.producer.ProducerConfig; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @Configuration @ComponentScan("com.example.kafka") public class AppConfig { @Bean @Named("sharedProducerConsumerConfig") private static Properties sharedProducerConsumerConfig() { Properties properties = new Properties(); properties.put("zookeeper.connect", "127.0.0.1:2181"); properties.put("group.id", "group1"); properties.put("zookeeper.session.timeout.ms", "400"); properties.put("zookeeper.sync.time.ms", "200"); properties.put("auto.commit.interval.ms", "1000"); return properties; } @Bean @Named("consumerConfig") private static ConsumerConfig consumerConfig() { Properties properties = sharedProducerConsumerConfig(); return new ConsumerConfig(properties); } @Bean @Named("producerConfig") private static ProducerConfig producerConfig() { Properties properties = sharedProducerConsumerConfig(); properties.put("serializer.class", "kafka.serializer.StringEncoder"); properties.put("metadata.broker.list", "localhost:9092"); return new ProducerConfig(properties); } } Consumer.java ------------------------------------------------------------------- import kafka.consumer.KafkaStream; import kafka.consumer.ConsumerIterator; public class Consumer implements Runnable { private KafkaStream kafkaStream; private Integer threadNumber; public Consumer(KafkaStream kafkaStream, Integer threadNumber) { this.threadNumber = threadNumber; this.kafkaStream = kafkaStream; } public void run() { ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); while(true) { try { Thread.sleep(1000); } catch (InterruptedException e) { break; } while(it.hasNext()) { System.out.println("Thread " + threadNumber + ": " + new String(it.next().message())); } } System.out.println("Shutting down Thread: " + threadNumber); } } ConsumerThreadPool.java (the run method does not work, the runSingleWorker method does work) --------------------------------------------------------------------------- -------------------- import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.consumer.ConsumerConfig; import kafka.javaapi.consumer.ConsumerConnector; import java.util.Map; import java.util.List; import java.util.HashMap; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import com.truecar.inventory.worker.core.application.config.AppConfig; public class ConsumerThreadPool { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; private static ApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class); public ConsumerThreadPool(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context .getBean("consumerConfig")); this.topic = topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(Integer numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, numThreads); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> topicListeners = consumerMap.get(topic); executor = Executors.newFixedThreadPool(numThreads); for(Integer i = 0; i < numThreads; i++ ){ KafkaStream<byte[], byte[]> stream = topicListeners.get(i); executor.submit(new Consumer(stream, i)); } } public void runSingleWorker(Integer numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } while(it.hasNext()){ System.out.println(new String(it.next().message())); } } } } Pom.xml ------------------------------------------------------------------------ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>group1</groupId> <artifactId>artifact1</artifactId> <version>0.1.0</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <org.springframework.version>3.2.4.RELEASE</org.springframework.version> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>3.2.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>3.2.4.RELEASE</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.0-beta1</version> </dependency> <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> <version>1</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.3</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> </dependencies> <build> <finalName>inventory-core</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>com.truecar.inventory.worker.core.application.Starter</mainClass > </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.dstovall</groupId> <artifactId>onejar-maven-plugin</artifactId> <version>1.4.4</version> <executions> <execution> <configuration> <onejarVersion>0.97</onejarVersion> <classifier>onejar</classifier> </configuration> <goals> <goal>one-jar</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <pluginRepositories> <pluginRepository> <id>onejar-maven-plugin.googlecode.com</id> <url>http://onejar-maven-plugin.googlecode.com/svn/mavenrepo</url> </pluginRepository> </pluginRepositories> </project> -- On 8/28/13 8:24 AM, "Jun Rao" <jun...@gmail.com> wrote: >Could you remove the following statement and see if it works? > >System.out.println("Created iterator " + it.toString() + " thread number " >+ threadNumber); > >Thanks, > >Jun > > >On Tue, Aug 27, 2013 at 3:43 PM, David Williams ><dwilli...@truecar.com>wrote: > >> >> Hi all, >> >> I checked out the java source and looked at the java examples. They >> worked well in my IDE and on the console. However, I also tried the >> threaded example following the consumer group example. The problem is, >> this example is not working and toString on the stream iterator returns >>the >> words "empty iterator". Below, run2() method is the run method from the >> source code, THAT WORKS. The run() method below is from the Consumer >>Group >> Example and DOES NOT WORK. >> >> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example >> >> It simply prints messages like >> >> Created iterator empty iterator thread number 9 >> Created iterator empty iterator thread number 1 >> Shutting down Thread: 1 >> Created iterator empty iterator thread number 3 >> >> And continues doing so as I produce message using the console producer >>and >> does not print messages. >> >> >> >> >> Im not sure if this is a versioning issue, or what might be the cause. >> But help is appreciated! >> >> >> >> Here is the Consumer class: >> >> import kafka.consumer.KafkaStream; >> import kafka.consumer.ConsumerIterator; >> >> public class Consumer implements Runnable { >> >> private KafkaStream kafkaStream; >> private Integer threadNumber; >> >> public Consumer(KafkaStream kafkaStream, Integer threadNumber) { >> this.threadNumber = threadNumber; >> this.kafkaStream = kafkaStream; >> } >> >> public void run() { >> ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); >> System.out.println("Created iterator " + it.toString() + " >>thread >> number " + threadNumber); >> while(it.hasNext()) { >> System.out.println("Thread " + threadNumber + ": " + new >> String(it.next().message())); >> >> // validate >> // enrich >> // dispatch >> } >> System.out.println("Shutting down Thread: " + threadNumber); >> } >> >> } >> >> >> >> >> In my ConsumerThreadPool class: >> >> >> public class ConsumerThreadPool { >> >> private final ConsumerConnector consumer; >> private final String topic; >> >> private ExecutorService executor; >> private static ApplicationContext context = new >> AnnotationConfigApplicationContext(AppConfig.class); >> >> public ConsumerThreadPool(String topic) { >> consumer = >> >>kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)conte >>xt.getBean("consumerConfig")); >> this.topic = topic; >> } >> >> public void shutdown() { >> if (consumer != null) consumer.shutdown(); >> if (executor != null) executor.shutdown(); >> } >> >> public void run(Integer numThreads) { >> Map<String, Integer> topicCountMap = new HashMap<String, >> Integer>(); >> >> topicCountMap.put(topic, new Integer(numThreads)); >> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = >> consumer.createMessageStreams(topicCountMap); >> List<KafkaStream<byte[], byte[]>> streams = >>consumerMap.get(topic); >> >> // create threads >> executor = Executors.newFixedThreadPool(numThreads); >> >> // now create an object to consume the messages >> Integer threadNumber = 0; >> for(KafkaStream<byte[], byte[]> stream : streams) { >> executor.submit(new Consumer(stream, threadNumber)); >> threadNumber++; >> } >> } >> >> >> public void run2() { >> Map<String, Integer> topicCountMap = new HashMap<String, >> Integer>(); >> >> topicCountMap.put(topic, new Integer(1)); >> >> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = >> consumer.createMessageStreams(topicCountMap); >> >> KafkaStream<byte[], byte[]> stream = >> consumerMap.get(topic).get(0); >> ConsumerIterator<byte[], byte[]> it = stream.iterator(); >> while(true) { >> try { >> Thread.sleep(1000); >> } catch (InterruptedException e) { >> e.printStackTrace(); >> } >> while(it.hasNext()){ >> System.out.println(new String(it.next().message())); >> >> } >> } >> } >> >> } >> >> >> >> The AppConfig is pretty simple: >> >> @Configuration >> @ComponentScan("com.truecar.inventory.worker.core") >> public class AppConfig { >> >> @Bean >> @Named("sharedProducerConsumerConfig") >> private static Properties sharedProducerConsumerConfig() { >> Properties properties = new Properties(); >> properties.put("zookeeper.connect", "127.0.0.1:2181"); >> properties.put("group.id", "intelligence"); >> properties.put("zookeeper.session.timeout.ms", "400"); >> properties.put("zookeeper.sync.time.ms", "200"); >> properties.put("auto.commit.interval.ms", "1000"); >> return properties; >> } >> >> @Bean >> @Named("consumerConfig") >> private static ConsumerConfig consumerConfig() { >> Properties properties = sharedProducerConsumerConfig(); >> return new ConsumerConfig(properties); >> } >> >> @Bean >> @Named("producerConfig") >> private static ProducerConfig producerConfig() { >> Properties properties = sharedProducerConsumerConfig(); >> properties.put("serializer.class", >> "kafka.serializer.StringEncoder"); >> properties.put("metadata.broker.list", "localhost:9092"); >> return new ProducerConfig(properties); >> } >> >> } >> >> >> -- >> >>