Could you run it locally first to make sure it works, and you see output?
Also, I recommend going through the previous step-by-step approach to
narrow down where the problem is.

TD


On Mon, Jul 14, 2014 at 9:15 PM, [email protected] <[email protected]> wrote:

> Actually, I deployed this on yarn cluster(spark-submit) and I couldn't
> find any output from the yarn stdout logs
>
>
> On Mon, Jul 14, 2014 at 6:25 PM, Tathagata Das <
> [email protected]> wrote:
>
>> Can you make sure you are running locally on more than 1 local cores? You
>> could set the master in the SparkConf as conf.setMaster("local[4]"). Then
>> see if there are jobs running on every batch of data in the Spark web ui
>> (running on localhost:4040). If you still dont get any output, try first
>> simple printing recRDD.count() in the foreachRDD (that is, first test spark
>> streaming). If you can get that to work, then I would test the Spark SQL
>> stuff.
>>
>> TD
>>
>>
>> On Mon, Jul 14, 2014 at 5:25 PM, [email protected] <[email protected]>
>> wrote:
>>
>>> No errors but no output either... Thanks!
>>>
>>>
>>> On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das <
>>> [email protected]> wrote:
>>>
>>>> Could you elaborate on what is the problem you are facing? Compiler
>>>> error? Runtime error? Class-not-found error? Not receiving any data from
>>>> Kafka? Receiving data but SQL command throwing error? No errors but no
>>>> output either?
>>>>
>>>> TD
>>>>
>>>>
>>>> On Mon, Jul 14, 2014 at 4:06 PM, [email protected] <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> Couple days ago, I tried to integrate SQL and streaming together. My
>>>>> understanding is I can transform RDD from Dstream to schemaRDD and execute
>>>>> SQL on each RDD. But I got no luck
>>>>> Would you guys help me take a look at my code?  Thank you very much!
>>>>>
>>>>> object KafkaSpark {
>>>>>
>>>>>   def main(args: Array[String]): Unit = {
>>>>>     if (args.length < 4) {
>>>>>       System.err.println("Usage: KafkaSpark <zkQuorum> <group>
>>>>> <topics> <numThreads>")
>>>>>       System.exit(1)
>>>>>     }
>>>>>
>>>>>
>>>>>     val Array(zkQuorum, group, topics, numThreads) = args
>>>>>     val sparkConf = new SparkConf().setAppName("KafkaSpark")
>>>>>     val ssc =  new StreamingContext(sparkConf, Seconds(10))
>>>>>     val sc = new SparkContext(sparkConf)
>>>>>     val sqlContext = new SQLContext(sc);
>>>>> //    ssc.checkpoint("checkpoint")
>>>>>
>>>>>     // Importing the SQL context gives access to all the SQL functions
>>>>> and implicit conversions.
>>>>>     import sqlContext._
>>>>>
>>>>>
>>>>>     val tt = Time(10000)
>>>>>     val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>>>>     val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
>>>>> topicpMap).map(t => getRecord(t._2.split("#")))
>>>>>
>>>>>     val result = recordsStream.foreachRDD((recRDD, tt)=>{
>>>>>       recRDD.registerAsTable("records")
>>>>>       val result = sql("select * from records")
>>>>>       println(result)
>>>>>       result.foreach(println)
>>>>>     })
>>>>>
>>>>>     ssc.start()
>>>>>     ssc.awaitTermination()
>>>>>
>>>>>   }
>>>>>
>>>>>   def getRecord(l:Array[String]):Record = {
>>>>>     println("Getting the record")
>>>>>     Record(l(0), l(1))}
>>>>> }
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to