Hi the tool you are looking for is window function. Example:
>>> df.show() +--------+----+---+------+-------------+ |JoinDate|dept| id|income|income_age_ts| +--------+----+---+------+-------------+ | 4/20/13| ES|101| 19000| 4/20/17| | 4/20/13| OS|101| 10000| 10/3/15| | 4/20/12| DS|102| 13000| 5/9/17| | 4/20/12| CS|102| 12000| 5/8/17| | 4/20/10| EQ|103| 10000| 5/9/17| | 4/20/10| MD|103| 9000| 5/8/17| +--------+----+---+------+-------------+ >>> res = spark.sql("select *, row_number() over (partition by id order by income_age_ts desc) r from t") >>> res.show() +--------+----+---+------+-------------+---+ |JoinDate|dept| id|income|income_age_ts| r| +--------+----+---+------+-------------+---+ | 4/20/10| EQ|103| 10000| 5/9/17| 1| | 4/20/10| MD|103| 9000| 5/8/17| 2| | 4/20/13| ES|101| 19000| 4/20/17| 1| | 4/20/13| OS|101| 10000| 10/3/15| 2| | 4/20/12| DS|102| 13000| 5/9/17| 1| | 4/20/12| CS|102| 12000| 5/8/17| 2| +--------+----+---+------+-------------+---+ >>> res = spark.sql("select * from (select *, row_number() over (partition by id order by income_age_ts desc) r from t) x where r=1") >>> res.show() +--------+----+---+------+-------------+---+ |JoinDate|dept| id|income|income_age_ts| r| +--------+----+---+------+-------------+---+ | 4/20/10| EQ|103| 10000| 5/9/17| 1| | 4/20/13| ES|101| 19000| 4/20/17| 1| | 4/20/12| DS|102| 13000| 5/9/17| 1| +--------+----+---+------+-------------+---+ This should be better because it uses all in-built optimizations in Spark. Best Ayan On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com> wrote: > Please click on unnamed text/html link for better view > > On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com> > wrote: > >> >> ---------- Forwarded message --------- >> From: Mamillapalli, Purna Pradeep <PurnaPradeep.Mamillapalli@ >> capitalone.com> >> Date: Tue, Aug 29, 2017 at 8:08 PM >> Subject: Spark question >> To: purna pradeep <purna2prad...@gmail.com> >> >> Below is the input Dataframe(In real this is a very large Dataframe) >> >> >> >> EmployeeID >> >> INCOME >> >> INCOME AGE TS >> >> JoinDate >> >> Dept >> >> 101 >> >> 19000 >> >> 4/20/17 >> >> 4/20/13 >> >> ES >> >> 101 >> >> 10000 >> >> 10/3/15 >> >> 4/20/13 >> >> OS >> >> 102 >> >> 13000 >> >> 5/9/17 >> >> 4/20/12 >> >> DS >> >> 102 >> >> 12000 >> >> 5/8/17 >> >> 4/20/12 >> >> CS >> >> 103 >> >> 10000 >> >> 5/9/17 >> >> 4/20/10 >> >> EQ >> >> 103 >> >> 9000 >> >> 5/8/15 >> >> 4/20/10 >> >> MD >> >> Get the latest income of an employee which has Income_age ts <10 months >> >> Expected output Dataframe >> >> EmployeeID >> >> INCOME >> >> INCOME AGE TS >> >> JoinDate >> >> Dept >> >> 101 >> >> 19000 >> >> 4/20/17 >> >> 4/20/13 >> >> ES >> >> 102 >> >> 13000 >> >> 5/9/17 >> >> 4/20/12 >> >> DS >> >> 103 >> >> 10000 >> >> 5/9/17 >> >> 4/20/10 >> >> EQ >> >> >> > > > > > > Below is what im planning to implement >> >> >> >> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, >> *JOINDATE*: Int,DEPT:String) >> >> >> >> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( >> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*, >> *"Date"*). add(*"DEPT"*,*"String"*) >> >> >> >> *//Reading from the File **import *sparkSession.implicits._ >> >> *val *readEmpFile = sparkSession.read >> .option(*"sep"*, *","*) >> .schema(empSchema) >> .csv(INPUT_DIRECTORY) >> >> >> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] >> >> >> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* >> EmployeeID*) >> >> >> *val *k = groupByDf.mapGroups((key,value) => performETL(value)) >> >> >> >> >> >> *def *performETL(empData: Iterator[employee]) : new employee = { >> >> *val *empList = empData.toList >> >> >> *//calculate income has Logic to figureout latest income for an account >> and returns latest income val *income = calculateIncome(empList) >> >> >> *for *(i <- empList) { >> >> *val *row = i >> >> *return new *employee(row.EmployeeID, row.INCOMEAGE , income) >> } >> *return "Done"* >> >> >> >> } >> >> >> >> Is this a better approach or even the right approach to implement the >> same.If not please suggest a better way to implement the same? >> >> >> >> ------------------------------ >> >> The information contained in this e-mail is confidential and/or >> proprietary to Capital One and/or its affiliates and may only be used >> solely in performance of work or services for Capital One. The information >> transmitted herewith is intended only for use by the individual or entity >> to which it is addressed. If the reader of this message is not the intended >> recipient, you are hereby notified that any review, retransmission, >> dissemination, distribution, copying or other use of, or taking of any >> action in reliance upon this information is strictly prohibited. If you >> have received this communication in error, please contact the sender and >> delete the material from your computer. >> > -- Best Regards, Ayan Guha