Commenting out Example shutdown did not seem to make a difference, I added
the print statement below to highlight the fact.

The other threads still shut down, and only one thread lives on, eventually
that dies after a few minutes as well

Could this be that the producer default partitioner is isn't balancing data
across all partitions?

Thanks,
Chris

Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753

Last Shutdown via example.shutDown called!

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down

15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
scheduler

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping leader finder thread

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutting down

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Stopped

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutdown completed

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping all fetchers

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-consumergroup], Shutting down

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Stopped

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Shutdown completed

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-] All connections stopped

15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
thread.

Shutting down Thread: 2

Shutting down Thread: 1

Shutting down Thread: 3

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], ZKConsumerConnector shut down completed

Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
distance|-73.990215000000035|40.663669999999911

15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], stopping watcher executor thread for consumer consumergroup

Thread 0: 2015-04-29
12:55:56.313|1|11|Normal|-79.741653000000042|42.13045800000009

On Wed, Apr 29, 2015 at 10:11 AM, tao xiao <xiaotao...@gmail.com> wrote:

> example.shutdown(); in ConsumerGroupExample closes all consumer connections
> to Kafka. remove this line the consumer threads will run forever
>
> On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cpa...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I am trying to get a multi threaded HL consumer working against a 2
> broker
> > Kafka cluster with a 4 partition 2 replica  topic.
> >
> > The consumer code is set to run with 4 threads, one for each partition.
> >
> > The producer code uses the default partitioner and loops indefinitely
> > feeding events into the topic.(I excluded the while loop in the paste
> > below)
> >
> > What I see is the threads eventually all exit, even thought the producer
> is
> > still sending events into the topic.
> >
> > My understanding is that the consumer thread per partition is the correct
> > setup.
> >
> > Any ideas why this code doesn't continue to consume events at they are
> > pushed to the topic?
> >
> > I suspect I am configuring something wrong here, but am not sure what.
> >
> > Thanks,
> >
> > Chris
> >
> >
> > *T**opic Configuration*
> >
> > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
> >
> > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
> >
> > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
> >
> > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2
> >
> >  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
> >
> > *Producer Code:*
> >
> >      Properties props = new Properties();
> >
> >         props.put("metadata.broker.list", args[0]);
> >
> >         props.put("zk.connect", args[1]);
> >
> >         props.put("serializer.class", "kafka.serializer.StringEncoder");
> >
> >         props.put("request.required.acks", "1");
> >
> >         String TOPIC = args[2];
> >
> >         ProducerConfig config = new ProducerConfig(props);
> >
> >         Producer<String, String> producer = new Producer<String, String>(
> > config);
> >
> >         finalEvent = new Timestamp(new Date().getTime()) + "|"
> >
> >                     + truckIds[0] + "|" + driverIds[0] + "|" +
> > events[random
> > .nextInt(evtCnt)]
> >
> >                     + "|" + getLatLong(arrayroute17[i]);
> >
> >         try {
> >
> >                 KeyedMessage<String, String> data = new
> > KeyedMessage<String, String>(TOPIC, finalEvent);
> >
> >                 LOG.info("Sending Messge #: " + routeName[0] + ": " + i
> +",
> > msg:" + finalEvent);
> >
> >                 producer.send(data);
> >
> >                 Thread.sleep(1000);
> >
> >             } catch (Exception e) {
> >
> >                 e.printStackTrace();
> >
> >             }
> >
> >
> > *Consumer Code:*
> >
> > public class ConsumerTest implements Runnable {
> >
> >    private KafkaStream m_stream;
> >
> >    private int m_threadNumber;
> >
> >    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
> >
> >        m_threadNumber = a_threadNumber;
> >
> >        m_stream = a_stream;
> >
> >    }
> >
> >    public void run() {
> >
> >        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> >
> >        while (it.hasNext()){
> >
> >            System.out.println("Thread " + m_threadNumber + ": " + new
> > String(it.next().message()));
> >
> >            try {
> >
> >              Thread.sleep(1000);
> >
> >             }catch (InterruptedException e) {
> >
> >                  e.printStackTrace();
> >
> >  }
> >
> >        }
> >
> >        System.out.println("Shutting down Thread: " + m_threadNumber);
> >
> >    }
> >
> > }
> >
> > public class ConsumerGroupExample {
> >
> >     private final ConsumerConnector consumer;
> >
> >     private final String topic;
> >
> >     private  ExecutorService executor;
> >
> >
> >
> >     public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> > String a_topic) {
> >
> >         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> >
> >                 createConsumerConfig(a_zookeeper, a_groupId));
> >
> >         this.topic = a_topic;
> >
> >     }
> >
> >
> >
> >     public void shutdown() {
> >
> >         if (consumer != null) consumer.shutdown();
> >
> >         if (executor != null) executor.shutdown();
> >
> >         try {
> >
> >             if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS))
> {
> >
> >                 System.out.println("Timed out waiting for consumer
> threads
> > to shut down, exiting uncleanly");
> >
> >             }
> >
> >         } catch (InterruptedException e) {
> >
> >             System.out.println("Interrupted during shutdown, exiting
> > uncleanly");
> >
> >         }
> >
> >    }
> >
> >
> >
> >     public void run(int a_numThreads) {
> >
> >         Map<String, Integer> topicCountMap = new HashMap<String,
> > Integer>();
> >
> >         topicCountMap.put(topic, new Integer(a_numThreads));
> >
> >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> >
> >         List<KafkaStream<byte[], byte[]>> streams =
> consumerMap.get(topic);
> >
> >         executor = Executors.newFixedThreadPool(a_numThreads);
> >
> >         int threadNumber = 0;
> >
> >         for (final KafkaStream stream : streams) {
> >
> >             executor.submit(new ConsumerTest(stream, threadNumber));
> >
> >             threadNumber++;
> >
> >         }
> >
> >     }
> >
> >
> >
> >     private static ConsumerConfig createConsumerConfig(String
> a_zookeeper,
> > String a_groupId) {
> >
> >         Properties props = new Properties();
> >
> >         props.put("zookeeper.connect", a_zookeeper);
> >
> >         props.put("group.id", a_groupId);
> >
> >         props.put("zookeeper.session.timeout.ms", "400");
> >
> >         props.put("zookeeper.sync.time.ms", "200");
> >
> >         props.put("auto.commit.interval.ms", "1000");
> >
> >         props.put("consumer.timeout.ms", "-1");
> >
> >          return new ConsumerConfig(props);
> >
> >     }
> >
> >
> >
> >     public static void main(String[] args) {
> >
> >         String zooKeeper = args[0];
> >
> >         String groupId = args[1];
> >
> >         String topic = args[2];
> >
> >         int threads = Integer.parseInt(args[3]);
> >
> >         ConsumerGroupExample example = new
> ConsumerGroupExample(zooKeeper,
> > groupId, topic);
> >
> >         example.run(threads);
> >
> >         try {
> >
> >             Thread.sleep(10000);
> >
> >         } catch (InterruptedException ie) {
> >
> >
> >
> >         }
> >
> >         example.shutdown();
> >
> >     }
> >
> > }
> >
>
>
>
> --
> Regards,
> Tao
>

Reply via email to