Hey Chris,

Thanks for your quick help. Actually the dataset had issues, otherwise the
logic I implemented was not wrong.

I did this -

1)      *V.Imp *– Creating row by segregating columns after reading the tab
delimited file before converting into DF=

val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
x.split("\t")(2).toInt, x.split("\t")(3).toInt))



Do a take to see if it throws an error or not (this step is just for
ensuring if everything is going fine (as it is a lazy execution, that’s
why)=

stati.take(2)

*Ans:* res8: Array[(String, String, Int, Int)] = Array((uihgf,Pune,56,5),
(asfsds,***,43,1))

If this comes out, it means it is working fine. We can proceed.

2)      *V.Imp* - Now converting into DF=

val station =
stati.toDF("StationKey","StationName","Temparature","StationID")



Now doing a show to see how it looks like=

station.show

*Ans:*

* +----------+-----------+-----------+---------+*

*|StationKey|StationName|Temparature|StationID|*

*+----------+-----------+-----------+---------+*

*|     uihgf|       Pune|         56|        5|*

*|    asfsds|        ***|         43|        1|*

*|    fkwsdf|     Mumbai|         45|        6|*

*|      gddg|       ABCD|         32|        2|*

*|     grgzg|     *CSD**|         35|        3|*

*|     gsrsn|     Howrah|         22|        4|*

*|     ffafv|        ***|         34|        7|*

*+----------+-----------+-----------+---------+*



3)      Do the same for the other dataset -

i)                 val storr = stor.map(p => (p.split("\t")(0).toInt,
p.split("\t")(1), p.split("\t")(2).toInt, p.split("\t")(3)))

ii)                storr.take(2)

iii)               val storm = storr.toDF("ID","Name","Temp","Code")

iv)               storm.show





4)      Registering as table=

 val stations2 = station.registerTempTable("Stations")

val storms2 = storm.registerTempTable("Storms")



5)      Querying on the joinedDF as per requirements=

val joinedDF = sqlContext.sql("Select Stations.StationName as StationName,
Stations.StationID as StationID from Stations inner join Storms on
Storms.Code = Stations.StationKey where Stations.Temparature > 35")



6)      joinedDF.show

+-----------+---------+

|StationName|StationID|

+-----------+---------+

|       Pune|        5|

+-----------+---------+

7)      Saving the file as CSV=

joinedDF.coalesce(1).rdd.map(_.mkString(",")).saveAsTextFile("/user/root/spark_demo/scala/data/output/Question6Soln")



Thanks,

Aakash.

On Fri, Feb 17, 2017 at 4:17 PM, Christophe Préaud <
[email protected]> wrote:

> Hi Aakash,
>
> You can try this:
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{StringType, StructField, StructType}
>
> val header = Array("col1", "col2", "col3", "col4")
> val schema = StructType(header.map(StructField(_, StringType, true)))
>
> val statRow = stat.map(line => Row(line.split("\t"):_*))
> val df = spark.createDataFrame(statRow, schema)
>
> df.show
> +------+------+----+----+
> |  col1|  col2|col3|col4|
> +------+------+----+----+
> | uihgf| Paris|  56|   5|
> |asfsds|   ***|  43|   1|
> |fkwsdf|London|  45|   6|
> |  gddg|  ABCD|  32|   2|
> | grgzg|  *CSD|  35|   3|
> | gsrsn|  ADR*|  22|   4|
> +------+------+----+----+
>
> Please let me know if this works for you.
>
> Regards,
> Christophe.
>
>
> On 17/02/17 10:37, Aakash Basu wrote:
>
> Hi all,
>
>
> Without using case class I tried making a DF to work on the join and other
> filtration later. But I'm getting an ArrayIndexOutOfBoundException error
> while doing a show of the DF.
>
>
> 1)      Importing SQLContext=
>
> import org.apache.spark.sql.SQLContext._
>
> import org.apache.spark.sql.SQLContext
>
>
>
> 2)      Initializing SQLContext=
>
> val sqlContext = new SQLContext(sc)
>
>
>
> 3)      Importing implicits package for toDF conversion=
>
> import sqlContext.implicits._
>
>
>
> 4)      Reading the Station and Storm Files=
>
> val stat = sc.textFile("/user/root/spark_demo/scala/data/Stations.txt")
>
> val stor = sc.textFile("/user/root/spark_demo/scala/data/Storms.txt")
>
>
>
>
>
> stat.foreach(println)
>
>
> uihgf   Paris   56   5
>
> asfsds   ***   43   1
>
> fkwsdf   London   45   6
>
> gddg   ABCD   32   2
>
> grgzg   *CSD   35   3
>
> gsrsn   ADR*   22   4
>
>
> 5) Creating row by segregating columns after reading the tab delimited
> file before converting into DF=
>
>
> *val stati = stat.map(x => (x.split("\t")(0), x.split("\t")(1),
> x.split("\t")(2),x.split("\t")(3)))*
>
>
>
> 6)      Converting into DF=
>
> val station = stati.toDF()
>
> *station.show* is giving the below error ->
>
> 17/02/17 08:46:35 ERROR Executor: Exception in task 0.0 in stage 9.0 (TID
> 15)
> java.lang.ArrayIndexOutOfBoundsException: 1
>
>
> Please help!
>
> Thanks,
> Aakash.
>
>
>
> ------------------------------
> Kelkoo SAS
> Société par Actions Simplifiée
> Au capital de € 4.168.964,30
> Siège social : 158 Ter Rue du Temple 75003 Paris
> 425 093 069 RCS Paris
>
> Ce message et les pièces jointes sont confidentiels et établis à
> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
> destinataire de ce message, merci de le détruire et d'en avertir
> l'expéditeur.
>

Reply via email to