Sorry I have sent both codes as consumer codes. This is the producer code.
*Producer.java*
package kafka.examples;
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Producer/* extends Thread*/
{
private final kafka.javaapi.producer.Producer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();
public Producer(String topic)
{
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
// Use random partitioner. Don't need the key type. Just set it to
Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.Producer<Integer, String>(new
ProducerConfig(props));
this.topic = topic;
System.out.println("Producer at "+this.topic);
}
public void putdata() {
int messageNo = 1;
while(messageNo < 100)
{
String messageStr = new String("Message_" + messageNo);
producer.send(new KeyedMessage<Integer, String>(topic ,messageStr));
messageNo = messageNo +1;
}
producer.close();
System.out.println("Producer exit");
}
}
On Mon, Jan 20, 2014 at 8:46 PM, Abhishek Bhattacharjee <
[email protected]> wrote:
> Hello,
> I am new to kafka and facing some problem.
> My producer code works properly and sends data.
> But the consumer is not able to read it.
> Here are the codes for Producer and Consumer.
> Something is wrong with the Consumer.java code can someone please help
> with this.
>
>
> *Producer.java*
>
> package kafka.examples;
>
>
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.message.Message;
>
>
> public class Consumer
> {
> private final ConsumerConnector consumer;
> private final String topic;
>
> public Consumer(String topic)
> {
> consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> createConsumerConfig());
> this.topic = topic;
> System.out.println("Consumer at "+this.topic);
> }
>
> private static ConsumerConfig createConsumerConfig()
> {
> Properties props = new Properties();
> props.put("zookeeper.connect", KafkaProperties.zkConnect);
> props.put("group.id", KafkaProperties.groupId);
> props.put("zookeeper.session.timeout.ms", "400");
> props.put("zookeeper.sync.time.ms", "200");
> props.put("auto.commit.interval.ms", "1000");
>
> return new ConsumerConfig(props);
>
> }
>
> public void readdata() {
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> topicCountMap.put(topic, new Integer(1));
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
> System.out.println("Inside read data");
> while(it.hasNext())
> System.out.println(new String(it.next().message()));
>
> }
> }
>
> And this is the consumer code.
>
> *Consumer.java*
>
> package kafka.examples;
>
>
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.ConsumerIterator;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.message.Message;
>
>
> public class Consumer
> {
> private final ConsumerConnector consumer;
> private final String topic;
>
> public Consumer(String topic)
> {
> consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
> createConsumerConfig());
> this.topic = topic;
> System.out.println("Consumer at "+topic);
> }
>
> private static ConsumerConfig createConsumerConfig()
> {
> Properties props = new Properties();
> props.put("zookeeper.connect", KafkaProperties.zkConnect);
> props.put("group.id", KafkaProperties.groupId);
> props.put("zookeeper.session.timeout.ms", "400");
> props.put("zookeeper.sync.time.ms", "200");
> props.put("auto.commit.interval.ms", "1000");
>
> return new ConsumerConfig(props);
>
> }
>
> public void readdata() {
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> topicCountMap.put(topic, new Integer(1));
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
> while(it.hasNext())
> System.out.println(new String(it.next().message()));
> }
> }
>
>
> Thanks.
> --
> *Abhishek Bhattacharjee*
> *Pune Institute of Computer Technology*
>
--
*Abhishek Bhattacharjee*
*Pune Institute of Computer Technology*