Sorry I have sent both codes as consumer codes. This is the producer code.

*Producer.java*

package kafka.examples;


import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Producer/* extends Thread*/
{
  private final kafka.javaapi.producer.Producer<Integer, String> producer;
  private final String topic;
  private final Properties props = new Properties();

  public Producer(String topic)
  {
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("metadata.broker.list", "localhost:9092");
    // Use random partitioner. Don't need the key type. Just set it to
Integer.
    // The message is of type String.
    producer = new kafka.javaapi.producer.Producer<Integer, String>(new
ProducerConfig(props));
    this.topic = topic;
    System.out.println("Producer at "+this.topic);
  }

  public void putdata() {
    int messageNo = 1;
    while(messageNo < 100)
    {
      String messageStr = new String("Message_" + messageNo);
      producer.send(new KeyedMessage<Integer, String>(topic ,messageStr));
      messageNo = messageNo +1;
    }
    producer.close();
    System.out.println("Producer exit");
  }

}


On Mon, Jan 20, 2014 at 8:46 PM, Abhishek Bhattacharjee <
abhishek.bhattacharje...@gmail.com> wrote:

> Hello,
> I am new to kafka and facing some problem.
> My producer code works properly and sends data.
> But the consumer is not able to read it.
> Here are the codes for Producer and Consumer.
> Something is wrong with the Consumer.java code can someone please help
> with this.
>
>
> *Producer.java*
>
> package kafka.examples;
>
>
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.message.Message;
>
>
> public class Consumer
> {
>     private final ConsumerConnector consumer;
>     private final String topic;
>
>     public Consumer(String topic)
>     {
> consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
>        createConsumerConfig());
>  this.topic = topic;
> System.out.println("Consumer at "+this.topic);
>     }
>
>     private static ConsumerConfig createConsumerConfig()
>     {
> Properties props = new Properties();
> props.put("zookeeper.connect", KafkaProperties.zkConnect);
>  props.put("group.id", KafkaProperties.groupId);
> props.put("zookeeper.session.timeout.ms", "400");
>  props.put("zookeeper.sync.time.ms", "200");
> props.put("auto.commit.interval.ms", "1000");
>
> return new ConsumerConfig(props);
>
>     }
>
>     public void readdata() {
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>  topicCountMap.put(topic, new Integer(1));
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>  KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
>  System.out.println("Inside read data");
> while(it.hasNext())
>     System.out.println(new String(it.next().message()));
>
>     }
> }
>
> And this is the consumer code.
>
> *Consumer.java*
>
> package kafka.examples;
>
>
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.message.Message;
>
>
> public class Consumer
> {
>   private final ConsumerConnector consumer;
>   private final String topic;
>
>   public Consumer(String topic)
>   {
>     consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
>             createConsumerConfig());
>     this.topic = topic;
>     System.out.println("Consumer at "+topic);
>   }
>
>   private static ConsumerConfig createConsumerConfig()
>   {
>     Properties props = new Properties();
>     props.put("zookeeper.connect", KafkaProperties.zkConnect);
>     props.put("group.id", KafkaProperties.groupId);
>     props.put("zookeeper.session.timeout.ms", "400");
>     props.put("zookeeper.sync.time.ms", "200");
>     props.put("auto.commit.interval.ms", "1000");
>
>     return new ConsumerConfig(props);
>
>   }
>
>   public void readdata() {
>     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>     topicCountMap.put(topic, new Integer(1));
>     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>     KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
>     ConsumerIterator<byte[], byte[]> it = stream.iterator();
>     while(it.hasNext())
>       System.out.println(new String(it.next().message()));
>   }
> }
>
>
> Thanks.
> --
> *Abhishek Bhattacharjee*
> *Pune Institute of Computer Technology*
>



-- 
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*

Reply via email to