I see, as @ayan said, it's valid, but, why don't use API or SQL, the
build-in options are optimized
I understand  that SQL API is hard when trying to build an api over that,
but Spark API doesn't, and you can do a lot with that.

regards,


On Wed, Aug 30, 2017 at 10:31 AM, ayan guha <guha.a...@gmail.com> wrote:

> Well, using raw sql is a valid option, but if you do not want you can
> always implement the concept using apis. All these constructs have api
> counterparts, such as filter, window, over, row number etc.
>
> On Wed, 30 Aug 2017 at 10:49 pm, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> @Andres I need latest but it should less than 10 months based income_age
>> column and don't want to use sql here
>>
>> On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi <iaiva...@gmail.com> wrote:
>>
>>> Hi, if you need the last value from income in window function you can
>>> use last_value.
>>> No tested but meaby with @ayan sql
>>>
>>> spark.sql("select *, row_number(), last_value(income) over (partition by
>>> id order by income_age_ts desc) r from t")
>>>
>>>
>>> On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep <purna2prad...@gmail.com
>>> > wrote:
>>>
>>>> @ayan,
>>>>
>>>> Thanks for your response
>>>>
>>>> I would like to have functions in this case  calculateIncome and the
>>>> reason why I need function is to reuse in other parts of the application
>>>> ..that's the reason I'm planning for mapgroups with function as argument
>>>> which takes rowiterator ..but not sure if this is the best to implement as
>>>> my initial dataframe is very large
>>>>
>>>> On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> the tool you are looking for is window function.  Example:
>>>>>
>>>>> >>> df.show()
>>>>> +--------+----+---+------+-------------+
>>>>> |JoinDate|dept| id|income|income_age_ts|
>>>>> +--------+----+---+------+-------------+
>>>>> | 4/20/13|  ES|101| 19000|      4/20/17|
>>>>> | 4/20/13|  OS|101| 10000|      10/3/15|
>>>>> | 4/20/12|  DS|102| 13000|       5/9/17|
>>>>> | 4/20/12|  CS|102| 12000|       5/8/17|
>>>>> | 4/20/10|  EQ|103| 10000|       5/9/17|
>>>>> | 4/20/10|  MD|103|  9000|       5/8/17|
>>>>> +--------+----+---+------+-------------+
>>>>>
>>>>> >>> res = spark.sql("select *, row_number() over (partition by id
>>>>> order by income_age_ts desc) r from t")
>>>>> >>> res.show()
>>>>> +--------+----+---+------+-------------+---+
>>>>> |JoinDate|dept| id|income|income_age_ts|  r|
>>>>> +--------+----+---+------+-------------+---+
>>>>> | 4/20/10|  EQ|103| 10000|       5/9/17|  1|
>>>>> | 4/20/10|  MD|103|  9000|       5/8/17|  2|
>>>>> | 4/20/13|  ES|101| 19000|      4/20/17|  1|
>>>>> | 4/20/13|  OS|101| 10000|      10/3/15|  2|
>>>>> | 4/20/12|  DS|102| 13000|       5/9/17|  1|
>>>>> | 4/20/12|  CS|102| 12000|       5/8/17|  2|
>>>>> +--------+----+---+------+-------------+---+
>>>>>
>>>>> >>> res = spark.sql("select * from (select *, row_number() over
>>>>> (partition by id order by income_age_ts desc) r from t) x where r=1")
>>>>> >>> res.show()
>>>>> +--------+----+---+------+-------------+---+
>>>>> |JoinDate|dept| id|income|income_age_ts|  r|
>>>>> +--------+----+---+------+-------------+---+
>>>>> | 4/20/10|  EQ|103| 10000|       5/9/17|  1|
>>>>> | 4/20/13|  ES|101| 19000|      4/20/17|  1|
>>>>> | 4/20/12|  DS|102| 13000|       5/9/17|  1|
>>>>> +--------+----+---+------+-------------+---+
>>>>>
>>>>> This should be better because it uses all in-built optimizations in
>>>>> Spark.
>>>>>
>>>>> Best
>>>>> Ayan
>>>>>
>>>>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <
>>>>> purna2prad...@gmail.com> wrote:
>>>>>
>>>>>> Please click on unnamed text/html  link for better view
>>>>>>
>>>>>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <
>>>>>> purna2prad...@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> ---------- Forwarded message ---------
>>>>>>> From: Mamillapalli, Purna Pradeep <PurnaPradeep.Mamillapalli@
>>>>>>> capitalone.com>
>>>>>>> Date: Tue, Aug 29, 2017 at 8:08 PM
>>>>>>> Subject: Spark question
>>>>>>> To: purna pradeep <purna2prad...@gmail.com>
>>>>>>>
>>>>>>> Below is the input Dataframe(In real this is a very large Dataframe)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> EmployeeID
>>>>>>>
>>>>>>> INCOME
>>>>>>>
>>>>>>> INCOME AGE TS
>>>>>>>
>>>>>>> JoinDate
>>>>>>>
>>>>>>> Dept
>>>>>>>
>>>>>>> 101
>>>>>>>
>>>>>>> 19000
>>>>>>>
>>>>>>> 4/20/17
>>>>>>>
>>>>>>> 4/20/13
>>>>>>>
>>>>>>> ES
>>>>>>>
>>>>>>> 101
>>>>>>>
>>>>>>> 10000
>>>>>>>
>>>>>>> 10/3/15
>>>>>>>
>>>>>>> 4/20/13
>>>>>>>
>>>>>>> OS
>>>>>>>
>>>>>>> 102
>>>>>>>
>>>>>>> 13000
>>>>>>>
>>>>>>> 5/9/17
>>>>>>>
>>>>>>> 4/20/12
>>>>>>>
>>>>>>> DS
>>>>>>>
>>>>>>> 102
>>>>>>>
>>>>>>> 12000
>>>>>>>
>>>>>>> 5/8/17
>>>>>>>
>>>>>>> 4/20/12
>>>>>>>
>>>>>>> CS
>>>>>>>
>>>>>>> 103
>>>>>>>
>>>>>>> 10000
>>>>>>>
>>>>>>> 5/9/17
>>>>>>>
>>>>>>> 4/20/10
>>>>>>>
>>>>>>> EQ
>>>>>>>
>>>>>>> 103
>>>>>>>
>>>>>>> 9000
>>>>>>>
>>>>>>> 5/8/15
>>>>>>>
>>>>>>> 4/20/10
>>>>>>>
>>>>>>> MD
>>>>>>>
>>>>>>> Get the latest income of an employee which has  Income_age ts <10
>>>>>>> months
>>>>>>>
>>>>>>> Expected output Dataframe
>>>>>>>
>>>>>>> EmployeeID
>>>>>>>
>>>>>>> INCOME
>>>>>>>
>>>>>>> INCOME AGE TS
>>>>>>>
>>>>>>> JoinDate
>>>>>>>
>>>>>>> Dept
>>>>>>>
>>>>>>> 101
>>>>>>>
>>>>>>> 19000
>>>>>>>
>>>>>>> 4/20/17
>>>>>>>
>>>>>>> 4/20/13
>>>>>>>
>>>>>>> ES
>>>>>>>
>>>>>>> 102
>>>>>>>
>>>>>>> 13000
>>>>>>>
>>>>>>> 5/9/17
>>>>>>>
>>>>>>> 4/20/12
>>>>>>>
>>>>>>> DS
>>>>>>>
>>>>>>> 103
>>>>>>>
>>>>>>> 10000
>>>>>>>
>>>>>>> 5/9/17
>>>>>>>
>>>>>>> 4/20/10
>>>>>>>
>>>>>>> EQ
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Below is what im planning to implement
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE:
>>>>>>> Int, *JOINDATE*: Int,DEPT:String)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*
>>>>>>> ).add(*"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(
>>>>>>> *"JOINDATE"*,*"Date"*). add(*"DEPT"*,*"String"*)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *//Reading from the File **import *sparkSession.implicits._
>>>>>>>
>>>>>>> *val *readEmpFile = sparkSession.read
>>>>>>>   .option(*"sep"*, *","*)
>>>>>>>   .schema(empSchema)
>>>>>>>   .csv(INPUT_DIRECTORY)
>>>>>>>
>>>>>>>
>>>>>>> *//Create employee DataFrame **val *custDf =
>>>>>>> readEmpFile.as[employee]
>>>>>>>
>>>>>>>
>>>>>>> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
>>>>>>> EmployeeID*)
>>>>>>>
>>>>>>>
>>>>>>> *val *k = groupByDf.mapGroups((key,value) => performETL(value))
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *def *performETL(empData: Iterator[employee]) : new employee  = {
>>>>>>>
>>>>>>>   *val *empList = empData.toList
>>>>>>>
>>>>>>>
>>>>>>> *//calculate income has Logic to figureout latest income for an
>>>>>>> account and returns latest income   val *income =
>>>>>>> calculateIncome(empList)
>>>>>>>
>>>>>>>
>>>>>>>   *for *(i <- empList) {
>>>>>>>
>>>>>>>       *val *row = i
>>>>>>>
>>>>>>> *return new *employee(row.EmployeeID, row.INCOMEAGE , income)
>>>>>>>   }
>>>>>>>   *return  "Done"*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Is this a better approach or even the right approach to implement
>>>>>>> the same.If not please suggest a better way to implement the same?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>>
>>>>>>> The information contained in this e-mail is confidential and/or
>>>>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>>>>> solely in performance of work or services for Capital One. The 
>>>>>>> information
>>>>>>> transmitted herewith is intended only for use by the individual or 
>>>>>>> entity
>>>>>>> to which it is addressed. If the reader of this message is not the 
>>>>>>> intended
>>>>>>> recipient, you are hereby notified that any review, retransmission,
>>>>>>> dissemination, distribution, copying or other use of, or taking of any
>>>>>>> action in reliance upon this information is strictly prohibited. If you
>>>>>>> have received this communication in error, please contact the sender and
>>>>>>> delete the material from your computer.
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Ayan Guha
>>>>>
>>>>
>>>
>>>
>>> --
>>> Ing. Ivaldi Andres
>>>
>> --
> Best Regards,
> Ayan Guha
>



-- 
Ing. Ivaldi Andres

Reply via email to