hi all,
   I am reading the book "apache kafka" and write a simple producer
and consumer class. the producer works but the consumer hangs.
   The producer class:
public static void main(String[] args) {
    String topic="test-topic";
    Properties props = new Properties();
    props.put("metadata.broker.list","linux157:9092");
    props.put("serializer.class","kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    ProducerConfig config = new ProducerConfig(props);
    Producer<Integer, String> producer = new Producer<Integer,String>(config);
    for(int i=0;i<100;i++){
        KeyedMessage<Integer, String> data = new
KeyedMessage<Integer,String>(topic, "msg"+i);
        producer.send(data);
    }
    producer.close();

}

public class TestKafkaConsumer {
private final ConsumerConnector consumer;
private final String topic;

public TestKafkaConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
props));
this.topic = topic;
}

public void testConsumer() {
    Map<String, Integer> topicCount = new HashMap<String, Integer>();
    // Define single thread for topic
    topicCount.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer
             .createMessageStreams(topicCount);
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
    for (final KafkaStream stream : streams) {
        ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
        while (consumerIte.hasNext())
            System.out.println("Message from Single Topic :: "
                       + new String(consumerIte.next().message()));
        }
    if (consumer != null)
        consumer.shutdown();
}

public static void main(String[] args) {
    String topic = "test-topic";
    TestKafkaConsumer simpleHLConsumer = new
TestKafkaConsumer("linux157:2181",testgroup22", topic);
    simpleHLConsumer.testConsumer();

}

}

Reply via email to