Please help.
Anyone, any thoughts on the previous mail ?

Thanks
Sudev


On Fri, Dec 9, 2016 at 2:28 PM Sudev A C <sudev...@goibibo.com> wrote:

> Hi,
>
> Can anyone please help clarity on how accumulators can be used reliably to
> measure error/success/analytical metrics ?
>
> Given below is use case / code snippet that I have.
>
> val amtZero = sc.accumulator(0)
> val amtLarge = sc.accumulator(0)
> val amtNormal = sc.accumulator(0)
> val getAmount = (x: org.apache.spark.sql.Row) => if (x.isNullAt(somePos)) {
>   amtZero.add(1)
>   0.0
> } else {
>   val amount = x.getDouble(4)
>   if (amount > 10000) amtLarge.add(1) else amtNormal.add(1)
>   amount
> }
> mrdd = rdd.map(s => (s, getAmount(s)))
> mrdd.cache()
> another_mrdd = rdd2.map(s => (s, getAmount(s)))
> mrdd.save_to_redshift
> another_mrdd.save_to_redshift
> mrdd.union(another_mrdd).map().groupByKey().save_to_redshift
>
>
>
> // Get values from accumulators and persist it to a reliable store for
> analytics.
> save_to_datastore(amtZero.value, amtLarge.value, amtNormal.value)
>
>
>
> Few questions :
>
> 1. How many times should I expect the counts for items within mrdd and
> another_mrdd since both of these rdd's are being reused ? What happens when
> a part of DAG is skipped due to caching in between (say I'm caching
> only mrdd)?
>
> 2. Should I be worried about any possible stage/task failures (due to
> master-wroker network issues/resource-starvation/speculative-execution),
> can these events lead to wrong counts ?
>
> 3. Document says  **In transformations, users should be aware of that each
> task’s update may be applied more than once if tasks or job stages are
> re-executed.**
> Here re-execution of stages/tasks is referring to failure re-executions or
> re-execution due to stage/tasks position in DAG ?
>
> 4. Is it safe to say that usage of accumulators(for exact counts) are
> limited to .foreach() as actions guarantee exactly once updates ?
>
> Thanks
> Sudev
>
>
>
>

Reply via email to