I see, as @ayan said, it's valid, but, why don't use API or SQL, the build-in options are optimized I understand that SQL API is hard when trying to build an api over that, but Spark API doesn't, and you can do a lot with that.
regards, On Wed, Aug 30, 2017 at 10:31 AM, ayan guha <guha.a...@gmail.com> wrote: > Well, using raw sql is a valid option, but if you do not want you can > always implement the concept using apis. All these constructs have api > counterparts, such as filter, window, over, row number etc. > > On Wed, 30 Aug 2017 at 10:49 pm, purna pradeep <purna2prad...@gmail.com> > wrote: > >> @Andres I need latest but it should less than 10 months based income_age >> column and don't want to use sql here >> >> On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi <iaiva...@gmail.com> wrote: >> >>> Hi, if you need the last value from income in window function you can >>> use last_value. >>> No tested but meaby with @ayan sql >>> >>> spark.sql("select *, row_number(), last_value(income) over (partition by >>> id order by income_age_ts desc) r from t") >>> >>> >>> On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep <purna2prad...@gmail.com >>> > wrote: >>> >>>> @ayan, >>>> >>>> Thanks for your response >>>> >>>> I would like to have functions in this case calculateIncome and the >>>> reason why I need function is to reuse in other parts of the application >>>> ..that's the reason I'm planning for mapgroups with function as argument >>>> which takes rowiterator ..but not sure if this is the best to implement as >>>> my initial dataframe is very large >>>> >>>> On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote: >>>> >>>>> Hi >>>>> >>>>> the tool you are looking for is window function. Example: >>>>> >>>>> >>> df.show() >>>>> +--------+----+---+------+-------------+ >>>>> |JoinDate|dept| id|income|income_age_ts| >>>>> +--------+----+---+------+-------------+ >>>>> | 4/20/13| ES|101| 19000| 4/20/17| >>>>> | 4/20/13| OS|101| 10000| 10/3/15| >>>>> | 4/20/12| DS|102| 13000| 5/9/17| >>>>> | 4/20/12| CS|102| 12000| 5/8/17| >>>>> | 4/20/10| EQ|103| 10000| 5/9/17| >>>>> | 4/20/10| MD|103| 9000| 5/8/17| >>>>> +--------+----+---+------+-------------+ >>>>> >>>>> >>> res = spark.sql("select *, row_number() over (partition by id >>>>> order by income_age_ts desc) r from t") >>>>> >>> res.show() >>>>> +--------+----+---+------+-------------+---+ >>>>> |JoinDate|dept| id|income|income_age_ts| r| >>>>> +--------+----+---+------+-------------+---+ >>>>> | 4/20/10| EQ|103| 10000| 5/9/17| 1| >>>>> | 4/20/10| MD|103| 9000| 5/8/17| 2| >>>>> | 4/20/13| ES|101| 19000| 4/20/17| 1| >>>>> | 4/20/13| OS|101| 10000| 10/3/15| 2| >>>>> | 4/20/12| DS|102| 13000| 5/9/17| 1| >>>>> | 4/20/12| CS|102| 12000| 5/8/17| 2| >>>>> +--------+----+---+------+-------------+---+ >>>>> >>>>> >>> res = spark.sql("select * from (select *, row_number() over >>>>> (partition by id order by income_age_ts desc) r from t) x where r=1") >>>>> >>> res.show() >>>>> +--------+----+---+------+-------------+---+ >>>>> |JoinDate|dept| id|income|income_age_ts| r| >>>>> +--------+----+---+------+-------------+---+ >>>>> | 4/20/10| EQ|103| 10000| 5/9/17| 1| >>>>> | 4/20/13| ES|101| 19000| 4/20/17| 1| >>>>> | 4/20/12| DS|102| 13000| 5/9/17| 1| >>>>> +--------+----+---+------+-------------+---+ >>>>> >>>>> This should be better because it uses all in-built optimizations in >>>>> Spark. >>>>> >>>>> Best >>>>> Ayan >>>>> >>>>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep < >>>>> purna2prad...@gmail.com> wrote: >>>>> >>>>>> Please click on unnamed text/html link for better view >>>>>> >>>>>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep < >>>>>> purna2prad...@gmail.com> wrote: >>>>>> >>>>>>> >>>>>>> ---------- Forwarded message --------- >>>>>>> From: Mamillapalli, Purna Pradeep <PurnaPradeep.Mamillapalli@ >>>>>>> capitalone.com> >>>>>>> Date: Tue, Aug 29, 2017 at 8:08 PM >>>>>>> Subject: Spark question >>>>>>> To: purna pradeep <purna2prad...@gmail.com> >>>>>>> >>>>>>> Below is the input Dataframe(In real this is a very large Dataframe) >>>>>>> >>>>>>> >>>>>>> >>>>>>> EmployeeID >>>>>>> >>>>>>> INCOME >>>>>>> >>>>>>> INCOME AGE TS >>>>>>> >>>>>>> JoinDate >>>>>>> >>>>>>> Dept >>>>>>> >>>>>>> 101 >>>>>>> >>>>>>> 19000 >>>>>>> >>>>>>> 4/20/17 >>>>>>> >>>>>>> 4/20/13 >>>>>>> >>>>>>> ES >>>>>>> >>>>>>> 101 >>>>>>> >>>>>>> 10000 >>>>>>> >>>>>>> 10/3/15 >>>>>>> >>>>>>> 4/20/13 >>>>>>> >>>>>>> OS >>>>>>> >>>>>>> 102 >>>>>>> >>>>>>> 13000 >>>>>>> >>>>>>> 5/9/17 >>>>>>> >>>>>>> 4/20/12 >>>>>>> >>>>>>> DS >>>>>>> >>>>>>> 102 >>>>>>> >>>>>>> 12000 >>>>>>> >>>>>>> 5/8/17 >>>>>>> >>>>>>> 4/20/12 >>>>>>> >>>>>>> CS >>>>>>> >>>>>>> 103 >>>>>>> >>>>>>> 10000 >>>>>>> >>>>>>> 5/9/17 >>>>>>> >>>>>>> 4/20/10 >>>>>>> >>>>>>> EQ >>>>>>> >>>>>>> 103 >>>>>>> >>>>>>> 9000 >>>>>>> >>>>>>> 5/8/15 >>>>>>> >>>>>>> 4/20/10 >>>>>>> >>>>>>> MD >>>>>>> >>>>>>> Get the latest income of an employee which has Income_age ts <10 >>>>>>> months >>>>>>> >>>>>>> Expected output Dataframe >>>>>>> >>>>>>> EmployeeID >>>>>>> >>>>>>> INCOME >>>>>>> >>>>>>> INCOME AGE TS >>>>>>> >>>>>>> JoinDate >>>>>>> >>>>>>> Dept >>>>>>> >>>>>>> 101 >>>>>>> >>>>>>> 19000 >>>>>>> >>>>>>> 4/20/17 >>>>>>> >>>>>>> 4/20/13 >>>>>>> >>>>>>> ES >>>>>>> >>>>>>> 102 >>>>>>> >>>>>>> 13000 >>>>>>> >>>>>>> 5/9/17 >>>>>>> >>>>>>> 4/20/12 >>>>>>> >>>>>>> DS >>>>>>> >>>>>>> 103 >>>>>>> >>>>>>> 10000 >>>>>>> >>>>>>> 5/9/17 >>>>>>> >>>>>>> 4/20/10 >>>>>>> >>>>>>> EQ >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Below is what im planning to implement >>>>>>> >>>>>>> >>>>>>> >>>>>>> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: >>>>>>> Int, *JOINDATE*: Int,DEPT:String) >>>>>>> >>>>>>> >>>>>>> >>>>>>> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"* >>>>>>> ).add(*"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add( >>>>>>> *"JOINDATE"*,*"Date"*). add(*"DEPT"*,*"String"*) >>>>>>> >>>>>>> >>>>>>> >>>>>>> *//Reading from the File **import *sparkSession.implicits._ >>>>>>> >>>>>>> *val *readEmpFile = sparkSession.read >>>>>>> .option(*"sep"*, *","*) >>>>>>> .schema(empSchema) >>>>>>> .csv(INPUT_DIRECTORY) >>>>>>> >>>>>>> >>>>>>> *//Create employee DataFrame **val *custDf = >>>>>>> readEmpFile.as[employee] >>>>>>> >>>>>>> >>>>>>> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* >>>>>>> EmployeeID*) >>>>>>> >>>>>>> >>>>>>> *val *k = groupByDf.mapGroups((key,value) => performETL(value)) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> *def *performETL(empData: Iterator[employee]) : new employee = { >>>>>>> >>>>>>> *val *empList = empData.toList >>>>>>> >>>>>>> >>>>>>> *//calculate income has Logic to figureout latest income for an >>>>>>> account and returns latest income val *income = >>>>>>> calculateIncome(empList) >>>>>>> >>>>>>> >>>>>>> *for *(i <- empList) { >>>>>>> >>>>>>> *val *row = i >>>>>>> >>>>>>> *return new *employee(row.EmployeeID, row.INCOMEAGE , income) >>>>>>> } >>>>>>> *return "Done"* >>>>>>> >>>>>>> >>>>>>> >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> Is this a better approach or even the right approach to implement >>>>>>> the same.If not please suggest a better way to implement the same? >>>>>>> >>>>>>> >>>>>>> >>>>>>> ------------------------------ >>>>>>> >>>>>>> The information contained in this e-mail is confidential and/or >>>>>>> proprietary to Capital One and/or its affiliates and may only be used >>>>>>> solely in performance of work or services for Capital One. The >>>>>>> information >>>>>>> transmitted herewith is intended only for use by the individual or >>>>>>> entity >>>>>>> to which it is addressed. If the reader of this message is not the >>>>>>> intended >>>>>>> recipient, you are hereby notified that any review, retransmission, >>>>>>> dissemination, distribution, copying or other use of, or taking of any >>>>>>> action in reliance upon this information is strictly prohibited. If you >>>>>>> have received this communication in error, please contact the sender and >>>>>>> delete the material from your computer. >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards, >>>>> Ayan Guha >>>>> >>>> >>> >>> >>> -- >>> Ing. Ivaldi Andres >>> >> -- > Best Regards, > Ayan Guha > -- Ing. Ivaldi Andres