Hey Chris,
Thanks for your quick help. Actually the dataset had issues, otherwise the
logic I implemented was not wrong.
I did this -
1) *V.Imp *– Creating row by segregating columns after reading the tab
delimited file before converting into DF=
val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
x.split("\t")(2).toInt, x.split("\t")(3).toInt))
Do a take to see if it throws an error or not (this step is just for
ensuring if everything is going fine (as it is a lazy execution, that’s
why)=
stati.take(2)
*Ans:* res8: Array[(String, String, Int, Int)] = Array((uihgf,Pune,56,5),
(asfsds,***,43,1))
If this comes out, it means it is working fine. We can proceed.
2) *V.Imp* - Now converting into DF=
val station =
stati.toDF("StationKey","StationName","Temparature","StationID")
Now doing a show to see how it looks like=
station.show
*Ans:*
* +----------+-----------+-----------+---------+*
*|StationKey|StationName|Temparature|StationID|*
*+----------+-----------+-----------+---------+*
*| uihgf| Pune| 56| 5|*
*| asfsds| ***| 43| 1|*
*| fkwsdf| Mumbai| 45| 6|*
*| gddg| ABCD| 32| 2|*
*| grgzg| *CSD**| 35| 3|*
*| gsrsn| Howrah| 22| 4|*
*| ffafv| ***| 34| 7|*
*+----------+-----------+-----------+---------+*
3) Do the same for the other dataset -
i) val storr = stor.map(p => (p.split("\t")(0).toInt,
p.split("\t")(1), p.split("\t")(2).toInt, p.split("\t")(3)))
ii) storr.take(2)
iii) val storm = storr.toDF("ID","Name","Temp","Code")
iv) storm.show
4) Registering as table=
val stations2 = station.registerTempTable("Stations")
val storms2 = storm.registerTempTable("Storms")
5) Querying on the joinedDF as per requirements=
val joinedDF = sqlContext.sql("Select Stations.StationName as StationName,
Stations.StationID as StationID from Stations inner join Storms on
Storms.Code = Stations.StationKey where Stations.Temparature > 35")
6) joinedDF.show
+-----------+---------+
|StationName|StationID|
+-----------+---------+
| Pune| 5|
+-----------+---------+
7) Saving the file as CSV=
joinedDF.coalesce(1).rdd.map(_.mkString(",")).saveAsTextFile("/user/root/spark_demo/scala/data/output/Question6Soln")
Thanks,
Aakash.
On Fri, Feb 17, 2017 at 4:17 PM, Christophe Préaud <
[email protected]> wrote:
> Hi Aakash,
>
> You can try this:
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{StringType, StructField, StructType}
>
> val header = Array("col1", "col2", "col3", "col4")
> val schema = StructType(header.map(StructField(_, StringType, true)))
>
> val statRow = stat.map(line => Row(line.split("\t"):_*))
> val df = spark.createDataFrame(statRow, schema)
>
> df.show
> +------+------+----+----+
> | col1| col2|col3|col4|
> +------+------+----+----+
> | uihgf| Paris| 56| 5|
> |asfsds| ***| 43| 1|
> |fkwsdf|London| 45| 6|
> | gddg| ABCD| 32| 2|
> | grgzg| *CSD| 35| 3|
> | gsrsn| ADR*| 22| 4|
> +------+------+----+----+
>
> Please let me know if this works for you.
>
> Regards,
> Christophe.
>
>
> On 17/02/17 10:37, Aakash Basu wrote:
>
> Hi all,
>
>
> Without using case class I tried making a DF to work on the join and other
> filtration later. But I'm getting an ArrayIndexOutOfBoundException error
> while doing a show of the DF.
>
>
> 1) Importing SQLContext=
>
> import org.apache.spark.sql.SQLContext._
>
> import org.apache.spark.sql.SQLContext
>
>
>
> 2) Initializing SQLContext=
>
> val sqlContext = new SQLContext(sc)
>
>
>
> 3) Importing implicits package for toDF conversion=
>
> import sqlContext.implicits._
>
>
>
> 4) Reading the Station and Storm Files=
>
> val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
>
> val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
>
>
>
>
>
> stat.foreach(println)
>
>
> uihgf Paris 56 5
>
> asfsds *** 43 1
>
> fkwsdf London 45 6
>
> gddg ABCD 32 2
>
> grgzg *CSD 35 3
>
> gsrsn ADR* 22 4
>
>
> 5) Creating row by segregating columns after reading the tab delimited
> file before converting into DF=
>
>
> *val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
> x.split("\t")(2),x.split("\t")(3)))*
>
>
>
> 6) Converting into DF=
>
> val station = stati.toDF()
>
> *station.show* is giving the below error ->
>
> 17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID
> 15)
> java.lang.ArrayIndexOutOfBoundsException: 1
>
>
> Please help!
>
> Thanks,
> Aakash.
>
>
>
> ------------------------------
> Kelkoo SAS
> Société par Actions Simplifiée
> Au capital de € 4.168.964,30
> Siège social : 158 Ter Rue du Temple 75003 Paris
> 425 093 069 RCS Paris
>
> Ce message et les pièces jointes sont confidentiels et établis à
> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
> destinataire de ce message, merci de le détruire et d'en avertir
> l'expéditeur.
>