Hi

the tool you are looking for is window function.  Example:

>>> df.show()
+--------+----+---+------+-------------+
|JoinDate|dept| id|income|income_age_ts|
+--------+----+---+------+-------------+
| 4/20/13|  ES|101| 19000|      4/20/17|
| 4/20/13|  OS|101| 10000|      10/3/15|
| 4/20/12|  DS|102| 13000|       5/9/17|
| 4/20/12|  CS|102| 12000|       5/8/17|
| 4/20/10|  EQ|103| 10000|       5/9/17|
| 4/20/10|  MD|103|  9000|       5/8/17|
+--------+----+---+------+-------------+

>>> res = spark.sql("select *, row_number() over (partition by id order by
income_age_ts desc) r from t")
>>> res.show()
+--------+----+---+------+-------------+---+
|JoinDate|dept| id|income|income_age_ts|  r|
+--------+----+---+------+-------------+---+
| 4/20/10|  EQ|103| 10000|       5/9/17|  1|
| 4/20/10|  MD|103|  9000|       5/8/17|  2|
| 4/20/13|  ES|101| 19000|      4/20/17|  1|
| 4/20/13|  OS|101| 10000|      10/3/15|  2|
| 4/20/12|  DS|102| 13000|       5/9/17|  1|
| 4/20/12|  CS|102| 12000|       5/8/17|  2|
+--------+----+---+------+-------------+---+

>>> res = spark.sql("select * from (select *, row_number() over (partition
by id order by income_age_ts desc) r from t) x where r=1")
>>> res.show()
+--------+----+---+------+-------------+---+
|JoinDate|dept| id|income|income_age_ts|  r|
+--------+----+---+------+-------------+---+
| 4/20/10|  EQ|103| 10000|       5/9/17|  1|
| 4/20/13|  ES|101| 19000|      4/20/17|  1|
| 4/20/12|  DS|102| 13000|       5/9/17|  1|
+--------+----+---+------+-------------+---+

This should be better because it uses all in-built optimizations in Spark.

Best
Ayan

On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com>
wrote:

> Please click on unnamed text/html  link for better view
>
> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>>
>> ---------- Forwarded message ---------
>> From: Mamillapalli, Purna Pradeep <PurnaPradeep.Mamillapalli@
>> capitalone.com>
>> Date: Tue, Aug 29, 2017 at 8:08 PM
>> Subject: Spark question
>> To: purna pradeep <purna2prad...@gmail.com>
>>
>> Below is the input Dataframe(In real this is a very large Dataframe)
>>
>>
>>
>> EmployeeID
>>
>> INCOME
>>
>> INCOME AGE TS
>>
>> JoinDate
>>
>> Dept
>>
>> 101
>>
>> 19000
>>
>> 4/20/17
>>
>> 4/20/13
>>
>> ES
>>
>> 101
>>
>> 10000
>>
>> 10/3/15
>>
>> 4/20/13
>>
>> OS
>>
>> 102
>>
>> 13000
>>
>> 5/9/17
>>
>> 4/20/12
>>
>> DS
>>
>> 102
>>
>> 12000
>>
>> 5/8/17
>>
>> 4/20/12
>>
>> CS
>>
>> 103
>>
>> 10000
>>
>> 5/9/17
>>
>> 4/20/10
>>
>> EQ
>>
>> 103
>>
>> 9000
>>
>> 5/8/15
>>
>> 4/20/10
>>
>> MD
>>
>> Get the latest income of an employee which has  Income_age ts <10 months
>>
>> Expected output Dataframe
>>
>> EmployeeID
>>
>> INCOME
>>
>> INCOME AGE TS
>>
>> JoinDate
>>
>> Dept
>>
>> 101
>>
>> 19000
>>
>> 4/20/17
>>
>> 4/20/13
>>
>> ES
>>
>> 102
>>
>> 13000
>>
>> 5/9/17
>>
>> 4/20/12
>>
>> DS
>>
>> 103
>>
>> 10000
>>
>> 5/9/17
>>
>> 4/20/10
>>
>> EQ
>>
>>
>>
>
>
>
>
>
> Below is what im planning to implement
>>
>>
>>
>> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
>> *JOINDATE*: Int,DEPT:String)
>>
>>
>>
>> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
>> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,
>> *"Date"*). add(*"DEPT"*,*"String"*)
>>
>>
>>
>> *//Reading from the File **import *sparkSession.implicits._
>>
>> *val *readEmpFile = sparkSession.read
>>   .option(*"sep"*, *","*)
>>   .schema(empSchema)
>>   .csv(INPUT_DIRECTORY)
>>
>>
>> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee]
>>
>>
>> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
>> EmployeeID*)
>>
>>
>> *val *k = groupByDf.mapGroups((key,value) => performETL(value))
>>
>>
>>
>>
>>
>> *def *performETL(empData: Iterator[employee]) : new employee  = {
>>
>>   *val *empList = empData.toList
>>
>>
>> *//calculate income has Logic to figureout latest income for an account
>> and returns latest income   val *income = calculateIncome(empList)
>>
>>
>>   *for *(i <- empList) {
>>
>>       *val *row = i
>>
>> *return new *employee(row.EmployeeID, row.INCOMEAGE , income)
>>   }
>>   *return  "Done"*
>>
>>
>>
>> }
>>
>>
>>
>> Is this a better approach or even the right approach to implement the
>> same.If not please suggest a better way to implement the same?
>>
>>
>>
>> ------------------------------
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>


-- 
Best Regards,
Ayan Guha

Reply via email to