Do you "Shutting down Thread: " in the output? Are all threads shut down?
Thanks, Jun On Wed, Aug 28, 2013 at 1:52 PM, David Williams <dwilli...@truecar.com>wrote: > Hi Jun, > > Thanks for following up. I removed the statement but still see no > messages from the producer. Also when that statement is in with the > single threaded consumer example, it prints "non-empty iterator" for its > toString method versus "empty iterator" in the non working multi stream > example. > > Here is the code. When this is running in a loop, I have been sending > messages via the console producer script. > > > AppCongig.java > -------------------------------------------------------------- > import javax.inject.Named; > import java.util.Properties; > import kafka.consumer.ConsumerConfig; > import kafka.producer.ProducerConfig; > import org.springframework.context.annotation.Bean; > import org.springframework.context.annotation.ComponentScan; > import org.springframework.context.annotation.Configuration; > > @Configuration > @ComponentScan("com.example.kafka") > public class AppConfig { > > @Bean > @Named("sharedProducerConsumerConfig") > private static Properties sharedProducerConsumerConfig() { > Properties properties = new Properties(); > properties.put("zookeeper.connect", "127.0.0.1:2181"); > properties.put("group.id", "group1"); > properties.put("zookeeper.session.timeout.ms", "400"); > properties.put("zookeeper.sync.time.ms", "200"); > properties.put("auto.commit.interval.ms", "1000"); > return properties; > } > > @Bean > @Named("consumerConfig") > private static ConsumerConfig consumerConfig() { > Properties properties = sharedProducerConsumerConfig(); > return new ConsumerConfig(properties); > } > > @Bean > @Named("producerConfig") > private static ProducerConfig producerConfig() { > Properties properties = sharedProducerConsumerConfig(); > properties.put("serializer.class", > "kafka.serializer.StringEncoder"); > properties.put("metadata.broker.list", "localhost:9092"); > return new ProducerConfig(properties); > } > > } > > > > > Consumer.java > ------------------------------------------------------------------- > import kafka.consumer.KafkaStream; > import kafka.consumer.ConsumerIterator; > > public class Consumer implements Runnable { > > private KafkaStream kafkaStream; > private Integer threadNumber; > > public Consumer(KafkaStream kafkaStream, Integer threadNumber) { > this.threadNumber = threadNumber; > this.kafkaStream = kafkaStream; > } > > public void run() { > ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); > while(true) { > > try { > Thread.sleep(1000); > } catch (InterruptedException e) { > break; > } > > while(it.hasNext()) { > > System.out.println("Thread " + threadNumber + ": " + new > String(it.next().message())); > > } > } > System.out.println("Shutting down Thread: " + threadNumber); > } > } > > > > > > ConsumerThreadPool.java (the run method does not work, the runSingleWorker > method does work) > --------------------------------------------------------------------------- > -------------------- > import kafka.consumer.ConsumerIterator; > import kafka.consumer.KafkaStream; > import kafka.consumer.ConsumerConfig; > import kafka.javaapi.consumer.ConsumerConnector; > > import java.util.Map; > import java.util.List; > import java.util.HashMap; > import java.util.concurrent.Executors; > import java.util.concurrent.ExecutorService; > > import org.springframework.context.ApplicationContext; > import > org.springframework.context.annotation.AnnotationConfigApplicationContext; > > import com.truecar.inventory.worker.core.application.config.AppConfig; > > public class ConsumerThreadPool { > > private final ConsumerConnector consumer; > private final String topic; > > private ExecutorService executor; > private static ApplicationContext context = new > AnnotationConfigApplicationContext(AppConfig.class); > > public ConsumerThreadPool(String topic) { > consumer = > kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context > .getBean("consumerConfig")); > this.topic = topic; > } > > public void shutdown() { > if (consumer != null) consumer.shutdown(); > if (executor != null) executor.shutdown(); > } > > public void run(Integer numThreads) { > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > > topicCountMap.put(topic, numThreads); > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > List<KafkaStream<byte[], byte[]>> topicListeners = > consumerMap.get(topic); > > executor = Executors.newFixedThreadPool(numThreads); > > for(Integer i = 0; i < numThreads; i++ ){ > KafkaStream<byte[], byte[]> stream = topicListeners.get(i); > executor.submit(new Consumer(stream, i)); > } > } > > > public void runSingleWorker(Integer numThreads) { > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > > topicCountMap.put(topic, new Integer(1)); > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > > KafkaStream<byte[], byte[]> stream = > consumerMap.get(topic).get(0); > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > while(true) { > try { > Thread.sleep(1000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > while(it.hasNext()){ > System.out.println(new String(it.next().message())); > > } > } > } > > } > > > > > > > Pom.xml > ------------------------------------------------------------------------ > <?xml version="1.0" encoding="UTF-8"?> > <project > xmlns="http://maven.apache.org/POM/4.0.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation=" > http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd"> > <modelVersion>4.0.0</modelVersion> > <groupId>group1</groupId> > <artifactId>artifact1</artifactId> > <version>0.1.0</version> > <packaging>jar</packaging> > <properties> > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > > <org.springframework.version>3.2.4.RELEASE</org.springframework.version> > </properties> > <dependencies> > <dependency> > <groupId>org.springframework</groupId> > <artifactId>spring-core</artifactId> > <version>3.2.4.RELEASE</version> > </dependency> > <dependency> > <groupId>org.springframework</groupId> > <artifactId>spring-context</artifactId> > <version>3.2.4.RELEASE</version> > </dependency> > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka_2.9.2</artifactId> > <version>0.8.0-beta1</version> > </dependency> > <dependency> > <groupId>javax.inject</groupId> > <artifactId>javax.inject</artifactId> > <version>1</version> > </dependency> > <dependency> > <groupId>org.scala-lang</groupId> > <artifactId>scala-library</artifactId> > <version>2.9.2</version> > </dependency> > <dependency> > <groupId>log4j</groupId> > <artifactId>log4j</artifactId> > <version>1.2.17</version> > </dependency> > <dependency> > <groupId>com.101tec</groupId> > <artifactId>zkclient</artifactId> > <version>0.3</version> > </dependency> > <dependency> > <groupId>com.yammer.metrics</groupId> > <artifactId>metrics-core</artifactId> > <version>2.2.0</version> > </dependency> > </dependencies> > <build> > <finalName>inventory-core</finalName> > <plugins> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-compiler-plugin</artifactId> > <version>3.0</version> > <configuration> > <source>1.7</source> > <target>1.7</target> > </configuration> > </plugin> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-jar-plugin</artifactId> > <configuration> > <archive> > <manifest> > > <mainClass>com.truecar.inventory.worker.core.application.Starter</mainClass > > > </manifest> > </archive> > </configuration> > </plugin> > <plugin> > <groupId>org.dstovall</groupId> > <artifactId>onejar-maven-plugin</artifactId> > <version>1.4.4</version> > <executions> > <execution> > <configuration> > <onejarVersion>0.97</onejarVersion> > <classifier>onejar</classifier> > </configuration> > <goals> > <goal>one-jar</goal> > </goals> > </execution> > </executions> > </plugin> > </plugins> > </build> > <pluginRepositories> > <pluginRepository> > <id>onejar-maven-plugin.googlecode.com</id> > > <url>http://onejar-maven-plugin.googlecode.com/svn/mavenrepo</url> > </pluginRepository> > </pluginRepositories> > </project> > > > > > > > > > > > -- > > > > > > > On 8/28/13 8:24 AM, "Jun Rao" <jun...@gmail.com> wrote: > > >Could you remove the following statement and see if it works? > > > >System.out.println("Created iterator " + it.toString() + " thread number " > >+ threadNumber); > > > >Thanks, > > > >Jun > > > > > >On Tue, Aug 27, 2013 at 3:43 PM, David Williams > ><dwilli...@truecar.com>wrote: > > > >> > >> Hi all, > >> > >> I checked out the java source and looked at the java examples. They > >> worked well in my IDE and on the console. However, I also tried the > >> threaded example following the consumer group example. The problem is, > >> this example is not working and toString on the stream iterator returns > >>the > >> words "empty iterator". Below, run2() method is the run method from the > >> source code, THAT WORKS. The run() method below is from the Consumer > >>Group > >> Example and DOES NOT WORK. > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example > >> > >> It simply prints messages like > >> > >> Created iterator empty iterator thread number 9 > >> Created iterator empty iterator thread number 1 > >> Shutting down Thread: 1 > >> Created iterator empty iterator thread number 3 > >> > >> And continues doing so as I produce message using the console producer > >>and > >> does not print messages. > >> > >> > >> > >> > >> Im not sure if this is a versioning issue, or what might be the cause. > >> But help is appreciated! > >> > >> > >> > >> Here is the Consumer class: > >> > >> import kafka.consumer.KafkaStream; > >> import kafka.consumer.ConsumerIterator; > >> > >> public class Consumer implements Runnable { > >> > >> private KafkaStream kafkaStream; > >> private Integer threadNumber; > >> > >> public Consumer(KafkaStream kafkaStream, Integer threadNumber) { > >> this.threadNumber = threadNumber; > >> this.kafkaStream = kafkaStream; > >> } > >> > >> public void run() { > >> ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); > >> System.out.println("Created iterator " + it.toString() + " > >>thread > >> number " + threadNumber); > >> while(it.hasNext()) { > >> System.out.println("Thread " + threadNumber + ": " + new > >> String(it.next().message())); > >> > >> // validate > >> // enrich > >> // dispatch > >> } > >> System.out.println("Shutting down Thread: " + threadNumber); > >> } > >> > >> } > >> > >> > >> > >> > >> In my ConsumerThreadPool class: > >> > >> > >> public class ConsumerThreadPool { > >> > >> private final ConsumerConnector consumer; > >> private final String topic; > >> > >> private ExecutorService executor; > >> private static ApplicationContext context = new > >> AnnotationConfigApplicationContext(AppConfig.class); > >> > >> public ConsumerThreadPool(String topic) { > >> consumer = > >> > >>kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)conte > >>xt.getBean("consumerConfig")); > >> this.topic = topic; > >> } > >> > >> public void shutdown() { > >> if (consumer != null) consumer.shutdown(); > >> if (executor != null) executor.shutdown(); > >> } > >> > >> public void run(Integer numThreads) { > >> Map<String, Integer> topicCountMap = new HashMap<String, > >> Integer>(); > >> > >> topicCountMap.put(topic, new Integer(numThreads)); > >> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > >> consumer.createMessageStreams(topicCountMap); > >> List<KafkaStream<byte[], byte[]>> streams = > >>consumerMap.get(topic); > >> > >> // create threads > >> executor = Executors.newFixedThreadPool(numThreads); > >> > >> // now create an object to consume the messages > >> Integer threadNumber = 0; > >> for(KafkaStream<byte[], byte[]> stream : streams) { > >> executor.submit(new Consumer(stream, threadNumber)); > >> threadNumber++; > >> } > >> } > >> > >> > >> public void run2() { > >> Map<String, Integer> topicCountMap = new HashMap<String, > >> Integer>(); > >> > >> topicCountMap.put(topic, new Integer(1)); > >> > >> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > >> consumer.createMessageStreams(topicCountMap); > >> > >> KafkaStream<byte[], byte[]> stream = > >> consumerMap.get(topic).get(0); > >> ConsumerIterator<byte[], byte[]> it = stream.iterator(); > >> while(true) { > >> try { > >> Thread.sleep(1000); > >> } catch (InterruptedException e) { > >> e.printStackTrace(); > >> } > >> while(it.hasNext()){ > >> System.out.println(new String(it.next().message())); > >> > >> } > >> } > >> } > >> > >> } > >> > >> > >> > >> The AppConfig is pretty simple: > >> > >> @Configuration > >> @ComponentScan("com.truecar.inventory.worker.core") > >> public class AppConfig { > >> > >> @Bean > >> @Named("sharedProducerConsumerConfig") > >> private static Properties sharedProducerConsumerConfig() { > >> Properties properties = new Properties(); > >> properties.put("zookeeper.connect", "127.0.0.1:2181"); > >> properties.put("group.id", "intelligence"); > >> properties.put("zookeeper.session.timeout.ms", "400"); > >> properties.put("zookeeper.sync.time.ms", "200"); > >> properties.put("auto.commit.interval.ms", "1000"); > >> return properties; > >> } > >> > >> @Bean > >> @Named("consumerConfig") > >> private static ConsumerConfig consumerConfig() { > >> Properties properties = sharedProducerConsumerConfig(); > >> return new ConsumerConfig(properties); > >> } > >> > >> @Bean > >> @Named("producerConfig") > >> private static ProducerConfig producerConfig() { > >> Properties properties = sharedProducerConsumerConfig(); > >> properties.put("serializer.class", > >> "kafka.serializer.StringEncoder"); > >> properties.put("metadata.broker.list", "localhost:9092"); > >> return new ProducerConfig(properties); > >> } > >> > >> } > >> > >> > >> -- > >> > >> > >