Have you tried adding Encoder for columns as suggested by Jungtaek Lim ? On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > I can rebuild the comma separated list as follows: > > > case class columns(KEY: String, TICKER: String, TIMEISSUED: String, > PRICE: Float) > val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext) > import sqlContext.implicits._ > > > for(line <- pricesRDD.collect.toArray) > { > var key = line._2.split(',').view(0).toString > var ticker = line._2.split(',').view(1).toString > var timeissued = line._2.split(',').view(2).toString > var price = line._2.split(',').view(3).toFloat > var allInOne = key+","+ticker+","+timeissued+","+price > println(allInOne) > > and the print shows the columns separated by "," > > > 34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89 > > So I just need to convert that line of rowinto a DataFrame > > I try this conversion to DF to write to MongoDB document with > MongoSpark.save(df, > writeConfig) > > var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, > price))).toDF > > [error] > /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235: > value toDF is not a member of org.apache.spark.rdd.RDD[columns] > [error] var df = sparkContext.parallelize(Seq(columns(key, > ticker, timeissued, price))).toDF > [ > > > frustrating! > > has anyone come across this? > > thanks > > On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <mich.talebza...@gmail.com> > wrote: > >> yep already tried it and it did not work. >> >> thanks >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> *Disclaimer:* Use it at your own risk. Any and all responsibility for >> any loss, damage or destruction of data or any other property which may >> arise from relying on this email's technical content is explicitly >> disclaimed. The author will in no case be liable for any monetary damages >> arising from such loss, damage or destruction. >> >> >> >> >> On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <deepakmc...@gmail.com> wrote: >> >>> Try this: >>> >>> *import **spark*.implicits._ >>> >>> df.toDF() >>> >>> >>> On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> With the following >>>> >>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, >>>> PRICE: Float) >>>> >>>> var key = line._2.split(',').view(0).toString >>>> var ticker = line._2.split(',').view(1).toString >>>> var timeissued = line._2.split(',').view(2).toString >>>> var price = line._2.split(',').view(3).toFloat >>>> >>>> var df = Seq(columns(key, ticker, timeissued, price)) >>>> println(df) >>>> >>>> I get >>>> >>>> >>>> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5)) >>>> >>>> So just need to convert that list to DF >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <mich.talebza...@gmail.com> >>>> wrote: >>>> >>>>> Thanks! >>>>> >>>>> The spark is version 2.3.0 >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn * >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>>> any loss, damage or destruction of data or any other property which may >>>>> arise from relying on this email's technical content is explicitly >>>>> disclaimed. The author will in no case be liable for any monetary damages >>>>> arising from such loss, damage or destruction. >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <kabh...@gmail.com> wrote: >>>>> >>>>>> You may also find below link useful (though it looks far old), since >>>>>> case class is the thing which Encoder is available, so there may be >>>>>> another >>>>>> reason which prevent implicit conversion. >>>>>> >>>>>> >>>>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973 >>>>>> >>>>>> And which Spark version do you use? >>>>>> >>>>>> >>>>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <kabh...@gmail.com>님이 작성: >>>>>> >>>>>>> Sorry I guess I pasted another method. the code is... >>>>>>> >>>>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): >>>>>>> DatasetHolder[T] = { >>>>>>> DatasetHolder(_sqlContext.createDataset(s)) >>>>>>> } >>>>>>> >>>>>>> >>>>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <kabh...@gmail.com>님이 작성: >>>>>>> >>>>>>>> I guess you need to have encoder for the type of result for >>>>>>>> columns(). >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229 >>>>>>>> >>>>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): >>>>>>>> DatasetHolder[T] = { >>>>>>>> DatasetHolder(_sqlContext.createDataset(rdd)) >>>>>>>> } >>>>>>>> >>>>>>>> You can see lots of Encoder implementations in the scala code. If >>>>>>>> your type doesn't match anything it may not work and you need to >>>>>>>> provide >>>>>>>> custom Encoder. >>>>>>>> >>>>>>>> -Jungtaek Lim (HeartSaVioR) >>>>>>>> >>>>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mich.talebza...@gmail.com>님이 >>>>>>>> 작성: >>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> I already do that as below >>>>>>>>> >>>>>>>>> val sqlContext= new >>>>>>>>> org.apache.spark.sql.SQLContext(sparkContext) >>>>>>>>> import sqlContext.implicits._ >>>>>>>>> >>>>>>>>> but still getting the error! >>>>>>>>> >>>>>>>>> Dr Mich Talebzadeh >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> LinkedIn * >>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>> >>>>>>>>> >>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility >>>>>>>>> for any loss, damage or destruction of data or any other property >>>>>>>>> which may >>>>>>>>> arise from relying on this email's technical content is explicitly >>>>>>>>> disclaimed. The author will in no case be liable for any monetary >>>>>>>>> damages >>>>>>>>> arising from such loss, damage or destruction. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <kabh...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> You may need to import implicits from your spark session like >>>>>>>>>> below: >>>>>>>>>> (Below code is borrowed from >>>>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html) >>>>>>>>>> >>>>>>>>>> import org.apache.spark.sql.SparkSession >>>>>>>>>> val spark = SparkSession >>>>>>>>>> .builder() >>>>>>>>>> .appName("Spark SQL basic example") >>>>>>>>>> .config("spark.some.config.option", "some-value") >>>>>>>>>> .getOrCreate() >>>>>>>>>> // For implicit conversions like converting RDDs to DataFramesimport >>>>>>>>>> spark.implicits._ >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh < >>>>>>>>>> mich.talebza...@gmail.com>님이 작성: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I have spark streaming that send data and I need to put that >>>>>>>>>>> data into MongoDB for test purposes. The easiest way is to create a >>>>>>>>>>> DF from >>>>>>>>>>> the individual list of columns as below >>>>>>>>>>> >>>>>>>>>>> I loop over individual rows in RDD and perform the following >>>>>>>>>>> >>>>>>>>>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: >>>>>>>>>>> String, PRICE: Float) >>>>>>>>>>> >>>>>>>>>>> for(line <- pricesRDD.collect.toArray) >>>>>>>>>>> { >>>>>>>>>>> var key = line._2.split(',').view(0).toString >>>>>>>>>>> var ticker = line._2.split(',').view(1).toString >>>>>>>>>>> var timeissued = line._2.split(',').view(2).toString >>>>>>>>>>> var price = line._2.split(',').view(3).toFloat >>>>>>>>>>> val priceToString = line._2.split(',').view(3) >>>>>>>>>>> if (price > 90.0) >>>>>>>>>>> { >>>>>>>>>>> println ("price > 90.0, saving to MongoDB >>>>>>>>>>> collection!") >>>>>>>>>>> // Save prices to mongoDB collection >>>>>>>>>>> * var df = Seq(columns(key, ticker, timeissued, >>>>>>>>>>> price)).toDF* >>>>>>>>>>> >>>>>>>>>>> but it fails with message >>>>>>>>>>> >>>>>>>>>>> value toDF is not a member of Seq[columns]. >>>>>>>>>>> >>>>>>>>>>> What would be the easiest way of resolving this please? >>>>>>>>>>> >>>>>>>>>>> thanks >>>>>>>>>>> >>>>>>>>>>> Dr Mich Talebzadeh >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> LinkedIn * >>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all >>>>>>>>>>> responsibility for any loss, damage or destruction of data or any >>>>>>>>>>> other >>>>>>>>>>> property which may arise from relying on this email's technical >>>>>>>>>>> content is >>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any >>>>>>>>>>> monetary damages arising from such loss, damage or destruction. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>> >>> -- >>> Thanks >>> Deepak >>> www.bigdatabig.com >>> www.keosha.net >>> >>