Aah, glad you found it out.

TD


On Tue, Jul 15, 2014 at 7:52 PM, hsy...@gmail.com <hsy...@gmail.com> wrote:

> Thanks Tathagata, we actually found the problem. I created SQLContext and
> StreamContext from different SparkContext.  But thanks for your help
>
> Best,
> Siyuan
>
>
> On Tue, Jul 15, 2014 at 6:53 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Oh yes, we have run sql, streaming and mllib all together.
>>
>> You can take a look at the demo <https://databricks.com/cloud> that
>> DataBricks gave at the spark summit.
>>
>> I think I get the problem is. Sql("....") returns a RDD, and println(rdd)
>> prints only the RDD's name. And rdd.foreach(println) prints the records in
>> the executors, so you wont find anything in the driver logs!
>> So try doing a collect, or take on the RDD returned by sql query and
>> print that.
>>
>> TD
>>
>>
>> On Tue, Jul 15, 2014 at 4:28 PM, hsy...@gmail.com <hsy...@gmail.com>
>> wrote:
>>
>>> By the way, have you ever run SQL and stream together? Do you know any
>>> example that works? Thanks!
>>>
>>>
>>> On Tue, Jul 15, 2014 at 4:28 PM, hsy...@gmail.com <hsy...@gmail.com>
>>> wrote:
>>>
>>>> Hi Tathagata,
>>>>
>>>> I could see the output of count, but no sql results. Run in standalone
>>>> is meaningless for me and I just run in my local single node yarn cluster.
>>>> Thanks
>>>>
>>>>
>>>> On Tue, Jul 15, 2014 at 12:48 PM, Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> Could you run it locally first to make sure it works, and you see
>>>>> output? Also, I recommend going through the previous step-by-step approach
>>>>> to narrow down where the problem is.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>> On Mon, Jul 14, 2014 at 9:15 PM, hsy...@gmail.com <hsy...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Actually, I deployed this on yarn cluster(spark-submit) and I
>>>>>> couldn't find any output from the yarn stdout logs
>>>>>>
>>>>>>
>>>>>> On Mon, Jul 14, 2014 at 6:25 PM, Tathagata Das <
>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>
>>>>>>> Can you make sure you are running locally on more than 1 local
>>>>>>> cores? You could set the master in the SparkConf as
>>>>>>> conf.setMaster("local[4]"). Then see if there are jobs running on every
>>>>>>> batch of data in the Spark web ui (running on localhost:4040). If you 
>>>>>>> still
>>>>>>> dont get any output, try first simple printing recRDD.count() in the
>>>>>>> foreachRDD (that is, first test spark streaming). If you can get that to
>>>>>>> work, then I would test the Spark SQL stuff.
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jul 14, 2014 at 5:25 PM, hsy...@gmail.com <hsy...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> No errors but no output either... Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jul 14, 2014 at 4:59 PM, Tathagata Das <
>>>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Could you elaborate on what is the problem you are facing?
>>>>>>>>> Compiler error? Runtime error? Class-not-found error? Not receiving 
>>>>>>>>> any
>>>>>>>>> data from Kafka? Receiving data but SQL command throwing error? No 
>>>>>>>>> errors
>>>>>>>>> but no output either?
>>>>>>>>>
>>>>>>>>> TD
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Jul 14, 2014 at 4:06 PM, hsy...@gmail.com <
>>>>>>>>> hsy...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> Couple days ago, I tried to integrate SQL and streaming together.
>>>>>>>>>> My understanding is I can transform RDD from Dstream to schemaRDD and
>>>>>>>>>> execute SQL on each RDD. But I got no luck
>>>>>>>>>> Would you guys help me take a look at my code?  Thank you very
>>>>>>>>>> much!
>>>>>>>>>>
>>>>>>>>>> object KafkaSpark {
>>>>>>>>>>
>>>>>>>>>>   def main(args: Array[String]): Unit = {
>>>>>>>>>>     if (args.length < 4) {
>>>>>>>>>>       System.err.println("Usage: KafkaSpark <zkQuorum> <group>
>>>>>>>>>> <topics> <numThreads>")
>>>>>>>>>>       System.exit(1)
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>     val Array(zkQuorum, group, topics, numThreads) = args
>>>>>>>>>>     val sparkConf = new SparkConf().setAppName("KafkaSpark")
>>>>>>>>>>     val ssc =  new StreamingContext(sparkConf, Seconds(10))
>>>>>>>>>>     val sc = new SparkContext(sparkConf)
>>>>>>>>>>     val sqlContext = new SQLContext(sc);
>>>>>>>>>> //    ssc.checkpoint("checkpoint")
>>>>>>>>>>
>>>>>>>>>>     // Importing the SQL context gives access to all the SQL
>>>>>>>>>> functions and implicit conversions.
>>>>>>>>>>     import sqlContext._
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>     val tt = Time(10000)
>>>>>>>>>>     val topicpMap =
>>>>>>>>>> topics.split(",").map((_,numThreads.toInt)).toMap
>>>>>>>>>>     val recordsStream = KafkaUtils.createStream(ssc, zkQuorum,
>>>>>>>>>> group, topicpMap).map(t => getRecord(t._2.split("#")))
>>>>>>>>>>
>>>>>>>>>>     val result = recordsStream.foreachRDD((recRDD, tt)=>{
>>>>>>>>>>       recRDD.registerAsTable("records")
>>>>>>>>>>       val result = sql("select * from records")
>>>>>>>>>>       println(result)
>>>>>>>>>>       result.foreach(println)
>>>>>>>>>>     })
>>>>>>>>>>
>>>>>>>>>>     ssc.start()
>>>>>>>>>>     ssc.awaitTermination()
>>>>>>>>>>
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>>   def getRecord(l:Array[String]):Record = {
>>>>>>>>>>     println("Getting the record")
>>>>>>>>>>     Record(l(0), l(1))}
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to