Hi All,

I am trying to get a multi threaded HL consumer working against a 2 broker
Kafka cluster with a 4 partition 2 replica  topic.

The consumer code is set to run with 4 threads, one for each partition.

The producer code uses the default partitioner and loops indefinitely
feeding events into the topic.(I excluded the while loop in the paste below)

What I see is the threads eventually all exit, even thought the producer is
still sending events into the topic.

My understanding is that the consumer thread per partition is the correct
setup.

Any ideas why this code doesn't continue to consume events at they are
pushed to the topic?

I suspect I am configuring something wrong here, but am not sure what.

Thanks,

Chris


*T**opic Configuration*

Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:

Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2

Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2

 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2

*Producer Code:*

     Properties props = new Properties();

        props.put("metadata.broker.list", args[0]);

        props.put("zk.connect", args[1]);

        props.put("serializer.class", "kafka.serializer.StringEncoder");

        props.put("request.required.acks", "1");

        String TOPIC = args[2];

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(
config);

        finalEvent = new Timestamp(new Date().getTime()) + "|"

                    + truckIds[0] + "|" + driverIds[0] + "|" + events[random
.nextInt(evtCnt)]

                    + "|" + getLatLong(arrayroute17[i]);

        try {

                KeyedMessage<String, String> data = new
KeyedMessage<String, String>(TOPIC, finalEvent);

                LOG.info("Sending Messge #: " + routeName[0] + ": " + i +",
msg:" + finalEvent);

                producer.send(data);

                Thread.sleep(1000);

            } catch (Exception e) {

                e.printStackTrace();

            }


*Consumer Code:*

public class ConsumerTest implements Runnable {

   private KafkaStream m_stream;

   private int m_threadNumber;

   public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

       m_threadNumber = a_threadNumber;

       m_stream = a_stream;

   }

   public void run() {

       ConsumerIterator<byte[], byte[]> it = m_stream.iterator();

       while (it.hasNext()){

           System.out.println("Thread " + m_threadNumber + ": " + new
String(it.next().message()));

           try {

             Thread.sleep(1000);

            }catch (InterruptedException e) {

                 e.printStackTrace();

 }

       }

       System.out.println("Shutting down Thread: " + m_threadNumber);

   }

}

public class ConsumerGroupExample {

    private final ConsumerConnector consumer;

    private final String topic;

    private  ExecutorService executor;



    public ConsumerGroupExample(String a_zookeeper, String a_groupId,
String a_topic) {

        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

                createConsumerConfig(a_zookeeper, a_groupId));

        this.topic = a_topic;

    }



    public void shutdown() {

        if (consumer != null) consumer.shutdown();

        if (executor != null) executor.shutdown();

        try {

            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {

                System.out.println("Timed out waiting for consumer threads
to shut down, exiting uncleanly");

            }

        } catch (InterruptedException e) {

            System.out.println("Interrupted during shutdown, exiting
uncleanly");

        }

   }



    public void run(int a_numThreads) {

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put(topic, new Integer(a_numThreads));

        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);

        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        executor = Executors.newFixedThreadPool(a_numThreads);

        int threadNumber = 0;

        for (final KafkaStream stream : streams) {

            executor.submit(new ConsumerTest(stream, threadNumber));

            threadNumber++;

        }

    }



    private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {

        Properties props = new Properties();

        props.put("zookeeper.connect", a_zookeeper);

        props.put("group.id", a_groupId);

        props.put("zookeeper.session.timeout.ms", "400");

        props.put("zookeeper.sync.time.ms", "200");

        props.put("auto.commit.interval.ms", "1000");

        props.put("consumer.timeout.ms", "-1");

         return new ConsumerConfig(props);

    }



    public static void main(String[] args) {

        String zooKeeper = args[0];

        String groupId = args[1];

        String topic = args[2];

        int threads = Integer.parseInt(args[3]);

        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper,
groupId, topic);

        example.run(threads);

        try {

            Thread.sleep(10000);

        } catch (InterruptedException ie) {



        }

        example.shutdown();

    }

}

Reply via email to