Please help. Anyone, any thoughts on the previous mail ? Thanks Sudev
On Fri, Dec 9, 2016 at 2:28 PM Sudev A C <sudev...@goibibo.com> wrote: > Hi, > > Can anyone please help clarity on how accumulators can be used reliably to > measure error/success/analytical metrics ? > > Given below is use case / code snippet that I have. > > val amtZero = sc.accumulator(0) > val amtLarge = sc.accumulator(0) > val amtNormal = sc.accumulator(0) > val getAmount = (x: org.apache.spark.sql.Row) => if (x.isNullAt(somePos)) { > amtZero.add(1) > 0.0 > } else { > val amount = x.getDouble(4) > if (amount > 10000) amtLarge.add(1) else amtNormal.add(1) > amount > } > mrdd = rdd.map(s => (s, getAmount(s))) > mrdd.cache() > another_mrdd = rdd2.map(s => (s, getAmount(s))) > mrdd.save_to_redshift > another_mrdd.save_to_redshift > mrdd.union(another_mrdd).map().groupByKey().save_to_redshift > > > > // Get values from accumulators and persist it to a reliable store for > analytics. > save_to_datastore(amtZero.value, amtLarge.value, amtNormal.value) > > > > Few questions : > > 1. How many times should I expect the counts for items within mrdd and > another_mrdd since both of these rdd's are being reused ? What happens when > a part of DAG is skipped due to caching in between (say I'm caching > only mrdd)? > > 2. Should I be worried about any possible stage/task failures (due to > master-wroker network issues/resource-starvation/speculative-execution), > can these events lead to wrong counts ? > > 3. Document says **In transformations, users should be aware of that each > task’s update may be applied more than once if tasks or job stages are > re-executed.** > Here re-execution of stages/tasks is referring to failure re-executions or > re-execution due to stage/tasks position in DAG ? > > 4. Is it safe to say that usage of accumulators(for exact counts) are > limited to .foreach() as actions guarantee exactly once updates ? > > Thanks > Sudev > > > >