Commenting out Example shutdown did not seem to make a difference, I added the print statement below to highlight the fact.
The other threads still shut down, and only one thread lives on, eventually that dies after a few minutes as well Could this be that the producer default partitioner is isn't balancing data across all partitions? Thanks, Chris Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka scheduler 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping leader finder thread 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping all fetchers 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-consumergroup], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-] All connections stopped 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. Shutting down Thread: 2 Shutting down Thread: 1 Shutting down Thread: 3 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector: [consumergroup], ZKConsumerConnector shut down completed Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail distance|-73.990215000000035|40.663669999999911 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector: [consumergroup], stopping watcher executor thread for consumer consumergroup Thread 0: 2015-04-29 12:55:56.313|1|11|Normal|-79.741653000000042|42.13045800000009 On Wed, Apr 29, 2015 at 10:11 AM, tao xiao <xiaotao...@gmail.com> wrote: > example.shutdown(); in ConsumerGroupExample closes all consumer connections > to Kafka. remove this line the consumer threads will run forever > > On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cpa...@gmail.com> > wrote: > > > Hi All, > > > > I am trying to get a multi threaded HL consumer working against a 2 > broker > > Kafka cluster with a 4 partition 2 replica topic. > > > > The consumer code is set to run with 4 threads, one for each partition. > > > > The producer code uses the default partitioner and loops indefinitely > > feeding events into the topic.(I excluded the while loop in the paste > > below) > > > > What I see is the threads eventually all exit, even thought the producer > is > > still sending events into the topic. > > > > My understanding is that the consumer thread per partition is the correct > > setup. > > > > Any ideas why this code doesn't continue to consume events at they are > > pushed to the topic? > > > > I suspect I am configuring something wrong here, but am not sure what. > > > > Thanks, > > > > Chris > > > > > > *T**opic Configuration* > > > > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: > > > > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 > > > > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 > > > > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 > > > > Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 > > > > *Producer Code:* > > > > Properties props = new Properties(); > > > > props.put("metadata.broker.list", args[0]); > > > > props.put("zk.connect", args[1]); > > > > props.put("serializer.class", "kafka.serializer.StringEncoder"); > > > > props.put("request.required.acks", "1"); > > > > String TOPIC = args[2]; > > > > ProducerConfig config = new ProducerConfig(props); > > > > Producer<String, String> producer = new Producer<String, String>( > > config); > > > > finalEvent = new Timestamp(new Date().getTime()) + "|" > > > > + truckIds[0] + "|" + driverIds[0] + "|" + > > events[random > > .nextInt(evtCnt)] > > > > + "|" + getLatLong(arrayroute17[i]); > > > > try { > > > > KeyedMessage<String, String> data = new > > KeyedMessage<String, String>(TOPIC, finalEvent); > > > > LOG.info("Sending Messge #: " + routeName[0] + ": " + i > +", > > msg:" + finalEvent); > > > > producer.send(data); > > > > Thread.sleep(1000); > > > > } catch (Exception e) { > > > > e.printStackTrace(); > > > > } > > > > > > *Consumer Code:* > > > > public class ConsumerTest implements Runnable { > > > > private KafkaStream m_stream; > > > > private int m_threadNumber; > > > > public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { > > > > m_threadNumber = a_threadNumber; > > > > m_stream = a_stream; > > > > } > > > > public void run() { > > > > ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); > > > > while (it.hasNext()){ > > > > System.out.println("Thread " + m_threadNumber + ": " + new > > String(it.next().message())); > > > > try { > > > > Thread.sleep(1000); > > > > }catch (InterruptedException e) { > > > > e.printStackTrace(); > > > > } > > > > } > > > > System.out.println("Shutting down Thread: " + m_threadNumber); > > > > } > > > > } > > > > public class ConsumerGroupExample { > > > > private final ConsumerConnector consumer; > > > > private final String topic; > > > > private ExecutorService executor; > > > > > > > > public ConsumerGroupExample(String a_zookeeper, String a_groupId, > > String a_topic) { > > > > consumer = kafka.consumer.Consumer.createJavaConsumerConnector( > > > > createConsumerConfig(a_zookeeper, a_groupId)); > > > > this.topic = a_topic; > > > > } > > > > > > > > public void shutdown() { > > > > if (consumer != null) consumer.shutdown(); > > > > if (executor != null) executor.shutdown(); > > > > try { > > > > if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) > { > > > > System.out.println("Timed out waiting for consumer > threads > > to shut down, exiting uncleanly"); > > > > } > > > > } catch (InterruptedException e) { > > > > System.out.println("Interrupted during shutdown, exiting > > uncleanly"); > > > > } > > > > } > > > > > > > > public void run(int a_numThreads) { > > > > Map<String, Integer> topicCountMap = new HashMap<String, > > Integer>(); > > > > topicCountMap.put(topic, new Integer(a_numThreads)); > > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > > consumer.createMessageStreams(topicCountMap); > > > > List<KafkaStream<byte[], byte[]>> streams = > consumerMap.get(topic); > > > > executor = Executors.newFixedThreadPool(a_numThreads); > > > > int threadNumber = 0; > > > > for (final KafkaStream stream : streams) { > > > > executor.submit(new ConsumerTest(stream, threadNumber)); > > > > threadNumber++; > > > > } > > > > } > > > > > > > > private static ConsumerConfig createConsumerConfig(String > a_zookeeper, > > String a_groupId) { > > > > Properties props = new Properties(); > > > > props.put("zookeeper.connect", a_zookeeper); > > > > props.put("group.id", a_groupId); > > > > props.put("zookeeper.session.timeout.ms", "400"); > > > > props.put("zookeeper.sync.time.ms", "200"); > > > > props.put("auto.commit.interval.ms", "1000"); > > > > props.put("consumer.timeout.ms", "-1"); > > > > return new ConsumerConfig(props); > > > > } > > > > > > > > public static void main(String[] args) { > > > > String zooKeeper = args[0]; > > > > String groupId = args[1]; > > > > String topic = args[2]; > > > > int threads = Integer.parseInt(args[3]); > > > > ConsumerGroupExample example = new > ConsumerGroupExample(zooKeeper, > > groupId, topic); > > > > example.run(threads); > > > > try { > > > > Thread.sleep(10000); > > > > } catch (InterruptedException ie) { > > > > > > > > } > > > > example.shutdown(); > > > > } > > > > } > > > > > > -- > Regards, > Tao >