Could you remove the following statement and see if it works? System.out.println("Created iterator " + it.toString() + " thread number " + threadNumber);
Thanks, Jun On Tue, Aug 27, 2013 at 3:43 PM, David Williams <dwilli...@truecar.com>wrote: > > Hi all, > > I checked out the java source and looked at the java examples. They > worked well in my IDE and on the console. However, I also tried the > threaded example following the consumer group example. The problem is, > this example is not working and toString on the stream iterator returns the > words "empty iterator". Below, run2() method is the run method from the > source code, THAT WORKS. The run() method below is from the Consumer Group > Example and DOES NOT WORK. > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example > > It simply prints messages like > > Created iterator empty iterator thread number 9 > Created iterator empty iterator thread number 1 > Shutting down Thread: 1 > Created iterator empty iterator thread number 3 > > And continues doing so as I produce message using the console producer and > does not print messages. > > > > > Im not sure if this is a versioning issue, or what might be the cause. > But help is appreciated! > > > > Here is the Consumer class: > > import kafka.consumer.KafkaStream; > import kafka.consumer.ConsumerIterator; > > public class Consumer implements Runnable { > > private KafkaStream kafkaStream; > private Integer threadNumber; > > public Consumer(KafkaStream kafkaStream, Integer threadNumber) { > this.threadNumber = threadNumber; > this.kafkaStream = kafkaStream; > } > > public void run() { > ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); > System.out.println("Created iterator " + it.toString() + " thread > number " + threadNumber); > while(it.hasNext()) { > System.out.println("Thread " + threadNumber + ": " + new > String(it.next().message())); > > // validate > // enrich > // dispatch > } > System.out.println("Shutting down Thread: " + threadNumber); > } > > } > > > > > In my ConsumerThreadPool class: > > > public class ConsumerThreadPool { > > private final ConsumerConnector consumer; > private final String topic; > > private ExecutorService executor; > private static ApplicationContext context = new > AnnotationConfigApplicationContext(AppConfig.class); > > public ConsumerThreadPool(String topic) { > consumer = > kafka.consumer.Consumer.createJavaConsumerConnector((ConsumerConfig)context.getBean("consumerConfig")); > this.topic = topic; > } > > public void shutdown() { > if (consumer != null) consumer.shutdown(); > if (executor != null) executor.shutdown(); > } > > public void run(Integer numThreads) { > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > > topicCountMap.put(topic, new Integer(numThreads)); > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); > > // create threads > executor = Executors.newFixedThreadPool(numThreads); > > // now create an object to consume the messages > Integer threadNumber = 0; > for(KafkaStream<byte[], byte[]> stream : streams) { > executor.submit(new Consumer(stream, threadNumber)); > threadNumber++; > } > } > > > public void run2() { > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > > topicCountMap.put(topic, new Integer(1)); > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > > KafkaStream<byte[], byte[]> stream = > consumerMap.get(topic).get(0); > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > while(true) { > try { > Thread.sleep(1000); > } catch (InterruptedException e) { > e.printStackTrace(); > } > while(it.hasNext()){ > System.out.println(new String(it.next().message())); > > } > } > } > > } > > > > The AppConfig is pretty simple: > > @Configuration > @ComponentScan("com.truecar.inventory.worker.core") > public class AppConfig { > > @Bean > @Named("sharedProducerConsumerConfig") > private static Properties sharedProducerConsumerConfig() { > Properties properties = new Properties(); > properties.put("zookeeper.connect", "127.0.0.1:2181"); > properties.put("group.id", "intelligence"); > properties.put("zookeeper.session.timeout.ms", "400"); > properties.put("zookeeper.sync.time.ms", "200"); > properties.put("auto.commit.interval.ms", "1000"); > return properties; > } > > @Bean > @Named("consumerConfig") > private static ConsumerConfig consumerConfig() { > Properties properties = sharedProducerConsumerConfig(); > return new ConsumerConfig(properties); > } > > @Bean > @Named("producerConfig") > private static ProducerConfig producerConfig() { > Properties properties = sharedProducerConsumerConfig(); > properties.put("serializer.class", > "kafka.serializer.StringEncoder"); > properties.put("metadata.broker.list", "localhost:9092"); > return new ProducerConfig(properties); > } > > } > > > -- > >