If you want to stop the streaming after 10 seconds, then use ssc.awaitTermination(10000). Make sure you push some data to kafka for the streaming to consume within the 10 seconds.
Thanks Best Regards On Mon, Dec 29, 2014 at 1:53 PM, Suhas Shekar <suhsheka...@gmail.com> wrote: > I'm very close! So I added that and then I added this: > http://mvnrepository.com/artifact/org.apache.kafka/kafka-clients/0.8.2-beta > > and it seems as though the stream is working as it says Stream 0 received > 1 or 2 blocks as I enter in messages on my kafka producer. However, the > Receiver seems to keep trying every 2 seconds (as I've included 2000 in my > duration in my java app). How can I stop the Receiver from consuming > messages after 10 seconds and output the word count to the console? > > Thanks a lot for all the help! I'm excited to see this word count :) > > Suhas Shekar > > University of California, Los Angeles > B.A. Economics, Specialization in Computing 2014 > > On Mon, Dec 29, 2014 at 12:04 AM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> Add this jar in the dependency >> http://mvnrepository.com/artifact/com.yammer.metrics/metrics-core/2.2.0 >> >> Thanks >> Best Regards >> >> On Mon, Dec 29, 2014 at 1:31 PM, Suhas Shekar <suhsheka...@gmail.com> >> wrote: >> >>> Hello Akhil, >>> >>> I chanced my Kafka dependency to 2.10 (which is the version of kafka >>> that was on 10.0.1.232). I am getting a slightly different error, but at >>> the same place as the previous error (pasted below). >>> >>> FYI, when I make these changes to the pom file, I do "mvn clean package" >>> then cp the new jar files from the repository to my lib of jar files which >>> is a argument in my spark-submit script which is in my original post. >>> >>> Thanks again for the time and help...much appreciated. >>> >>> >>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Starting receiver >>> 14/12/29 07:56:00 INFO KafkaReceiver: Starting Kafka Consumer Stream >>> with group: c1 >>> 14/12/29 07:56:00 INFO KafkaReceiver: Connecting to Zookeeper: >>> 10.0.1.232:2181 >>> 14/12/29 07:56:00 INFO BlockGenerator: Started block pushing thread >>> 14/12/29 07:56:00 INFO VerifiableProperties: Verifying properties >>> 14/12/29 07:56:00 INFO VerifiableProperties: Property group.id is >>> overridden to c1 >>> 14/12/29 07:56:00 INFO VerifiableProperties: Property zookeeper.connect >>> is overridden to 10.0.1.232:2181 >>> 14/12/29 07:56:00 INFO VerifiableProperties: Property >>> zookeeper.connection.timeout.ms is overridden to 10000 >>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Stopping receiver with >>> message: Error starting receiver 0: java.lang.NoClassDefFoundError: >>> com/yammer/metrics/Metrics >>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Called receiver onStop >>> 14/12/29 07:56:00 INFO ReceiverSupervisorImpl: Deregistering receiver 0 >>> 14/12/29 07:56:00 ERROR ReceiverTracker: Deregistered receiver for >>> stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: >>> com/yammer/metrics/Metrics >>> at >>> kafka.metrics.KafkaMetricsGroup$class.newMeter(KafkaMetricsGroup.scala:51) >>> at >>> kafka.consumer.ZookeeperConsumerConnector.newMeter(ZookeeperConsumerConnector.scala:83) >>> at >>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:107) >>> at >>> kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:142) >>> at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) >>> at >>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:97) >>> at >>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) >>> at >>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) >>> at >>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) >>> at >>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) >>> at >>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >>> at >>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >>> at >>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >>> at org.apache.spark.scheduler.Task.run(Task.scala:54) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >>> at java.lang.Thread.run(Thread.java:722) >>> Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.Metrics >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:423) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:356) >>> ... 18 more >>> >>> >>> Suhas Shekar >>> >>> University of California, Los Angeles >>> B.A. Economics, Specialization in Computing 2014 >>> >>> On Sun, Dec 28, 2014 at 11:52 PM, Suhas Shekar <suhsheka...@gmail.com> >>> wrote: >>> >>>> I made both versions 1.1.1 and I got the same error. I then tried >>>> making both 1.1.0 as that is the version of my Spark Core, but I got the >>>> same error. >>>> >>>> I noticed my Kafka dependency is for scala 2.9.2, while my spark >>>> streaming kafka dependency is 2.10.x...I will try changing that next, but >>>> don't think that will solve the error as I dont think the application had >>>> got to level yet. >>>> >>>> Please let me know of any possible next steps. >>>> >>>> Thank you again for the time and the help! >>>> >>>> >>>> >>>> Suhas Shekar >>>> >>>> University of California, Los Angeles >>>> B.A. Economics, Specialization in Computing 2014 >>>> >>>> On Sun, Dec 28, 2014 at 11:31 PM, Akhil Das <ak...@sigmoidanalytics.com >>>> > wrote: >>>> >>>>> Just looked at the pom file that you are using, why are you having >>>>> different versions in it? >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.spark</groupId> >>>>> <artifactId>spark-streaming-kafka_2.10</artifactId> >>>>> <version>*1.1.1*</version> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.spark</groupId> >>>>> <artifactId>spark-streaming_2.10</artifactId> >>>>> <version>*1.0.2*</version> >>>>> </dependency> >>>>> >>>>> can you make both the versions the same? >>>>> >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Mon, Dec 29, 2014 at 12:44 PM, Suhas Shekar <suhsheka...@gmail.com> >>>>> wrote: >>>>> >>>>>> 1) Could you please clarify on what you mean by checking the Scala >>>>>> version is correct? In my pom.xml file it is 2.10.4 (which is the same as >>>>>> when I start spark-shell). >>>>>> >>>>>> 2) The spark master URL is definitely correct as I have run other >>>>>> apps with the same script that use Spark (like a word count with a local >>>>>> file) >>>>>> >>>>>> Thank you for the help! >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Suhas Shekar >>>>>> >>>>>> University of California, Los Angeles >>>>>> B.A. Economics, Specialization in Computing 2014 >>>>>> >>>>>> On Sun, Dec 28, 2014 at 11:04 PM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>> >>>>>>> Make sure you verify the following: >>>>>>> >>>>>>> - Scala version : I think the correct version would be 2.10.x >>>>>>> - SparkMasterURL: Be sure that you copied the one displayed on the >>>>>>> webui's top left corner (running on port 8080) >>>>>>> >>>>>>> Thanks >>>>>>> Best Regards >>>>>>> >>>>>>> On Mon, Dec 29, 2014 at 12:26 PM, suhshekar52 <suhsheka...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> Hello Everyone, >>>>>>>> >>>>>>>> Thank you for the time and the help :). >>>>>>>> >>>>>>>> My goal here is to get this program working: >>>>>>>> >>>>>>>> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java >>>>>>>> >>>>>>>> The only lines I do not have from the example are lines 62-67. >>>>>>>> pom.xml >>>>>>>> < >>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml >>>>>>>> > >>>>>>>> >>>>>>>> Background: Have ec2 instances running. The standalone spark is >>>>>>>> running on >>>>>>>> top of Cloudera Manager 5.2. >>>>>>>> >>>>>>>> Pom file is attached and the same for both clusters. >>>>>>>> pom.xml >>>>>>>> < >>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/pom.xml >>>>>>>> > >>>>>>>> >>>>>>>> Here are a few different approaches I have taken and the issues I >>>>>>>> run into: >>>>>>>> >>>>>>>> *Standalone Mode* >>>>>>>> >>>>>>>> 1) Use spark-submit script to run: >>>>>>>> >>>>>>>> >>>>>>>> /opt/cloudera/parcels/CDH-5.2.1-1.cdh5.2.1.p0.12/lib/spark/bin/spark-submit >>>>>>>> --class SimpleApp --master spark://10.0.1.230:7077 --jars $(echo >>>>>>>> /home/ec2-user/sparkApps/SimpleApp/lib/*.jar | tr ' ' ',') >>>>>>>> /home/ec2-user/sparkApps/SimpleApp/target/simple-project-1.0.jar >>>>>>>> >>>>>>>> Interesting...I was getting an error like this: Initial job has not >>>>>>>> accepted >>>>>>>> any resources; check your cluster UI >>>>>>>> >>>>>>>> Now, when I run, it prints out the 3 Hello world statements in my >>>>>>>> code: >>>>>>>> KafkaJavaConsumer.txt >>>>>>>> < >>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n20879/KafkaJavaConsumer.txt >>>>>>>> > >>>>>>>> >>>>>>>> and then it seems to try to start the Kafka Stream, but fails: >>>>>>>> >>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Starting Kafka Consumer >>>>>>>> Stream with >>>>>>>> group: c1 >>>>>>>> 14/12/29 05:58:05 INFO ReceiverTracker: Registered receiver for >>>>>>>> stream 0 >>>>>>>> from akka://sparkDriver >>>>>>>> 14/12/29 05:58:05 INFO KafkaReceiver: Connecting to Zookeeper: >>>>>>>> 10.0.1.232:2181 >>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Started block pushing thread >>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopping receiver >>>>>>>> with >>>>>>>> message: Error starting receiver 0: java.lang.NoClassDefFoundError: >>>>>>>> scala/reflect/ClassManifest >>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Called receiver >>>>>>>> onStop >>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Deregistering >>>>>>>> receiver 0 >>>>>>>> ^C14/12/29 05:58:05 ERROR ReceiverTracker: Deregistered receiver >>>>>>>> for stream >>>>>>>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: >>>>>>>> scala/reflect/ClassManifest >>>>>>>> at >>>>>>>> kafka.utils.Log4jController$.<init>(Log4jController.scala:29) >>>>>>>> at >>>>>>>> kafka.utils.Log4jController$.<clinit>(Log4jController.scala) >>>>>>>> at kafka.utils.Logging$class.$init$(Logging.scala:29) >>>>>>>> at >>>>>>>> >>>>>>>> kafka.utils.VerifiableProperties.<init>(VerifiableProperties.scala:26) >>>>>>>> at >>>>>>>> kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:94) >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:96) >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:54) >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180) >>>>>>>> at >>>>>>>> >>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >>>>>>>> at >>>>>>>> >>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >>>>>>>> at java.lang.Thread.run(Thread.java:722) >>>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>>> scala.reflect.ClassManifest >>>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>>>>>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>>>>>>> at java.security.AccessController.doPrivileged(Native >>>>>>>> Method) >>>>>>>> at >>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:423) >>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:356) >>>>>>>> ... 18 more >>>>>>>> >>>>>>>> 14/12/29 05:58:05 INFO ReceiverSupervisorImpl: Stopped receiver 0 >>>>>>>> 14/12/29 05:58:05 INFO BlockGenerator: Stopping BlockGenerator >>>>>>>> >>>>>>>> I ran into a couple other Class not found errors, and was able to >>>>>>>> solve them >>>>>>>> by adding dependencies on the pom file, but have not found such a >>>>>>>> solution >>>>>>>> to this error. >>>>>>>> >>>>>>>> On the Kafka side of things, I am simply typing in messages as soon >>>>>>>> as I >>>>>>>> start the Java app on another console. Is this okay? >>>>>>>> >>>>>>>> I have not set up an advertised host on the kafka side as I was >>>>>>>> able to >>>>>>>> still receive messages from other consoles by setting up a consumer >>>>>>>> to >>>>>>>> listen to the private ip:port. Is this okay? >>>>>>>> >>>>>>>> Lastly, is there command, like --from-beginning for a consumer in >>>>>>>> the java >>>>>>>> application to get messages from the beginning? >>>>>>>> >>>>>>>> Thanks a lot for the help and happy holidays! >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> View this message in context: >>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-Simple-Kafka-Consumer-via-Spark-Java-app-tp20879.html >>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>> Nabble.com. >>>>>>>> >>>>>>>> >>>>>>>> --------------------------------------------------------------------- >>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >