Have you tried adding Encoder for columns as suggested by Jungtaek Lim ?

On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> I can rebuild the comma separated list as follows:
>
>
>    case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
> PRICE: Float)
>     val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
>     import sqlContext.implicits._
>
>
>          for(line <- pricesRDD.collect.toArray)
>          {
>            var key = line._2.split(',').view(0).toString
>            var ticker =  line._2.split(',').view(1).toString
>            var timeissued = line._2.split(',').view(2).toString
>            var price = line._2.split(',').view(3).toFloat
>            var allInOne = key+","+ticker+","+timeissued+","+price
>            println(allInOne)
>
> and the print shows the columns separated by ","
>
>
> 34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89
>
> So I just need to convert that line of rowinto a DataFrame
>
> I try this conversion to DF to write to MongoDB document with 
> MongoSpark.save(df,
> writeConfig)
>
> var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued,
> price))).toDF
>
> [error]
> /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235:
> value toDF is not a member of org.apache.spark.rdd.RDD[columns]
> [error]             var df = sparkContext.parallelize(Seq(columns(key,
> ticker, timeissued, price))).toDF
> [
>
>
> frustrating!
>
>  has anyone come across this?
>
> thanks
>
> On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> yep already tried it and it did not work.
>>
>> thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <deepakmc...@gmail.com> wrote:
>>
>>> Try this:
>>>
>>> *import **spark*.implicits._
>>>
>>> df.toDF()
>>>
>>>
>>> On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> With the following
>>>>
>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>>> PRICE: Float)
>>>>
>>>>  var key = line._2.split(',').view(0).toString
>>>>  var ticker =  line._2.split(',').view(1).toString
>>>>  var timeissued = line._2.split(',').view(2).toString
>>>>  var price = line._2.split(',').view(3).toFloat
>>>>
>>>>   var df = Seq(columns(key, ticker, timeissued, price))
>>>>  println(df)
>>>>
>>>> I get
>>>>
>>>>
>>>> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))
>>>>
>>>> So just need to convert that list to DF
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <mich.talebza...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks!
>>>>>
>>>>> The spark  is version 2.3.0
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <kabh...@gmail.com> wrote:
>>>>>
>>>>>> You may also find below link useful (though it looks far old), since
>>>>>> case class is the thing which Encoder is available, so there may be 
>>>>>> another
>>>>>> reason which prevent implicit conversion.
>>>>>>
>>>>>>
>>>>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>>>>>>
>>>>>> And which Spark version do you use?
>>>>>>
>>>>>>
>>>>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <kabh...@gmail.com>님이 작성:
>>>>>>
>>>>>>> Sorry I guess I pasted another method. the code is...
>>>>>>>
>>>>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): 
>>>>>>> DatasetHolder[T] = {
>>>>>>>   DatasetHolder(_sqlContext.createDataset(s))
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <kabh...@gmail.com>님이 작성:
>>>>>>>
>>>>>>>> I guess you need to have encoder for the type of result for
>>>>>>>> columns().
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>>>>>>
>>>>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): 
>>>>>>>> DatasetHolder[T] = {
>>>>>>>>   DatasetHolder(_sqlContext.createDataset(rdd))
>>>>>>>> }
>>>>>>>>
>>>>>>>> You can see lots of Encoder implementations in the scala code. If
>>>>>>>> your type doesn't match anything it may not work and you need to 
>>>>>>>> provide
>>>>>>>> custom Encoder.
>>>>>>>>
>>>>>>>> -Jungtaek Lim (HeartSaVioR)
>>>>>>>>
>>>>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mich.talebza...@gmail.com>님이
>>>>>>>> 작성:
>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> I already do that as below
>>>>>>>>>
>>>>>>>>>     val sqlContext= new
>>>>>>>>> org.apache.spark.sql.SQLContext(sparkContext)
>>>>>>>>>   import sqlContext.implicits._
>>>>>>>>>
>>>>>>>>> but still getting the error!
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * 
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other property 
>>>>>>>>> which may
>>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>>>> damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <kabh...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> You may need to import implicits from your spark session like
>>>>>>>>>> below:
>>>>>>>>>> (Below code is borrowed from
>>>>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>>>>>>>>
>>>>>>>>>> import org.apache.spark.sql.SparkSession
>>>>>>>>>> val spark = SparkSession
>>>>>>>>>>   .builder()
>>>>>>>>>>   .appName("Spark SQL basic example")
>>>>>>>>>>   .config("spark.some.config.option", "some-value")
>>>>>>>>>>   .getOrCreate()
>>>>>>>>>> // For implicit conversions like converting RDDs to DataFramesimport 
>>>>>>>>>> spark.implicits._
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <
>>>>>>>>>> mich.talebza...@gmail.com>님이 작성:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I have spark streaming that send data and I need to put that
>>>>>>>>>>> data into MongoDB for test purposes. The easiest way is to create a 
>>>>>>>>>>> DF from
>>>>>>>>>>> the individual list of columns as below
>>>>>>>>>>>
>>>>>>>>>>> I loop over individual rows in RDD and perform the following
>>>>>>>>>>>
>>>>>>>>>>>     case class columns(KEY: String, TICKER: String, TIMEISSUED:
>>>>>>>>>>> String, PRICE: Float)
>>>>>>>>>>>
>>>>>>>>>>>          for(line <- pricesRDD.collect.toArray)
>>>>>>>>>>>          {
>>>>>>>>>>>             var key = line._2.split(',').view(0).toString
>>>>>>>>>>>            var ticker =  line._2.split(',').view(1).toString
>>>>>>>>>>>            var timeissued = line._2.split(',').view(2).toString
>>>>>>>>>>>            var price = line._2.split(',').view(3).toFloat
>>>>>>>>>>>            val priceToString = line._2.split(',').view(3)
>>>>>>>>>>>            if (price > 90.0)
>>>>>>>>>>>            {
>>>>>>>>>>>              println ("price > 90.0, saving to MongoDB
>>>>>>>>>>> collection!")
>>>>>>>>>>>             // Save prices to mongoDB collection
>>>>>>>>>>>            * var df = Seq(columns(key, ticker, timeissued,
>>>>>>>>>>> price)).toDF*
>>>>>>>>>>>
>>>>>>>>>>> but it fails with message
>>>>>>>>>>>
>>>>>>>>>>>  value toDF is not a member of Seq[columns].
>>>>>>>>>>>
>>>>>>>>>>> What would be the easiest way of resolving this please?
>>>>>>>>>>>
>>>>>>>>>>> thanks
>>>>>>>>>>>
>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>>> responsibility for any loss, damage or destruction of data or any 
>>>>>>>>>>> other
>>>>>>>>>>> property which may arise from relying on this email's technical 
>>>>>>>>>>> content is
>>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>
>>> --
>>> Thanks
>>> Deepak
>>> www.bigdatabig.com
>>> www.keosha.net
>>>
>>

Reply via email to