>From what I understand about your code, it is getting data from different partitions of a topic - get all data from partition 1, then from partition 2, etc. Though you have configured it to read from just one partition (topicCount has count = 1). So I am not sure what your intention is, read all partitions serially, or in parallel.
If you want to start of Kafka + Spark Streaming, I strongly suggest reading the Kafka integration guide - https://spark.apache.org/docs/latest/streaming-kafka-integration.html and run the examples for the two ways - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala Since you understand the high level consumer idea, you may want to start with the first receiver-based approach, which uses HL consumer as well, and takes topicCounts. On Tue, Jul 21, 2015 at 8:23 AM, Hafsa Asif <hafsa.a...@matchinguu.com> wrote: > Hi, I have a simple High level Kafka Consumer like : > package matchinguu.kafka.consumer; > > > import kafka.consumer.Consumer; > import kafka.consumer.ConsumerConfig; > import kafka.consumer.ConsumerIterator; > import kafka.consumer.KafkaStream; > import kafka.javaapi.consumer.ConsumerConnector; > > import java.util.*; > > public class SimpleHLConsumer { > > private final ConsumerConnector consumer; > private final String topic; > > public SimpleHLConsumer(String zookeeper, String groupId, String topic) > { > Properties props = new Properties(); > props.put("zookeeper.connect", zookeeper); > props.put("group.id", groupId); > props.put("zookeeper.session.timeout.ms", "500"); > props.put("zookeeper.sync.time.ms", "250"); > props.put("auto.commit.interval.ms", "1000"); > > > consumer = Consumer.createJavaConsumerConnector(new > ConsumerConfig(props)); > this.topic = topic; > } > > public void testConsumer() { > Map<String, Integer> topicCount = new HashMap<String, Integer>(); > topicCount.put(topic, 1); > > Map<String, List<KafkaStream<byte[], byte[]>>> > consumerStreams > = consumer.createMessageStreams(topicCount); > List<KafkaStream<byte[], byte[]>> streams = > consumerStreams.get(topic); > for (final KafkaStream stream : streams) { > > ConsumerIterator<byte[], byte[]> it = stream.iterator(); > while (it.hasNext()) { > System.out.println("------------"); > System.out.println("Message from Single Topic: " + new > String(it.next().message())); > } > } > if (consumer != null) { > System.out.println("Shutdown Happens"); > consumer.shutdown(); > } > > } > > public static void main(String[] args) { > System.out.println("Consumer is now reading messages from > producer"); > //String topic = args[0]; > String topic = "test"; > SimpleHLConsumer simpleHLConsumer = new > SimpleHLConsumer("localhost:2181", "testgroup", topic); > simpleHLConsumer.testConsumer(); > } > > } > > I want to get my messages through Spark Java Streaming with Kafka > integration. Can anyone help me to reform this code so that I can get same > output with Spark Kafka integration. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Convert-Simple-Kafka-Consumer-to-standalone-Spark-JavaStream-Consumer-tp23930.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >