Hi, I have a simple High level Kafka Consumer like :
package matchinguu.kafka.consumer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.*;
public class SimpleHLConsumer {
private final ConsumerConnector consumer;
private final String topic;
public SimpleHLConsumer(String zookeeper, String groupId, String topic)
{
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new
ConsumerConfig(props));
this.topic = topic;
}
public void testConsumer() {
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams
= consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams =
consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("------------");
System.out.println("Message from Single Topic: " + new
String(it.next().message()));
}
}
if (consumer != null) {
System.out.println("Shutdown Happens");
consumer.shutdown();
}
}
public static void main(String[] args) {
System.out.println("Consumer is now reading messages from
producer");
//String topic = args[0];
String topic = "test";
SimpleHLConsumer simpleHLConsumer = new
SimpleHLConsumer("localhost:2181", "testgroup", topic);
simpleHLConsumer.testConsumer();
}
}
I want to get my messages through Spark Java Streaming with Kafka
integration. Can anyone help me to reform this code so that I can get same
output with Spark Kafka integration.
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Convert-Simple-Kafka-Consumer-to-standalone-Spark-JavaStream-Consumer-tp23930.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]