Hi all,
I'm incrementing several accumulators inside a foreach. Most of the time,
the accumulators will return the same value for the same dataset. However,
they sometimes differ.
I'm not sure how accumulators are implemented. Could this behavior be caused
by data not arriving before I print out
Hello,
I am seeing negative values for accumulators. Here's my implementation in a
standalone app in Spark 1.1.1rc:
implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] {
def addInPlace(t1: Int, t2: BigInt) = BigInt(t1) + t2
def addInPlace(t1: BigInt, t2: BigInt) = t1
To answer my own question,
I was declaring the accumulator incorrectly. The code should look like this:
scala> import org.apache.spark.AccumulatorParam
import org.apache.spark.AccumulatorParam
scala> :paste
// Entering paste mode (ctrl-D to finish)
implicit object BigIntAccumulatorParam extend
Similarly, I'm having an issue with the above solution when I use the
math.min() function to add to an accumulator. I'm seeing negative overflow
numbers again.
This code works fine without the math.min() and even if I add an arbitrarily
large number like 100
// doesn't work
someRDD.foreach(x=>{
torParam
>
> scala> val accu = sc.accumulator(BigInt(0))
> accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0
>
> scala> accu += 100
>
> scala> accu.value
> res1: scala.math.BigInt = 100
>
>
>
> Best Regards,
> Shixiong Zhu
>
> 2014-11-25 10
Hello!
Does anyone know why I may be receiving negative final accumulator values?
Thanks!
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
Quick update:
It is a filter job that creates the error above, not the reduceByKey
Why would a filter cause an out of memory?
Here is my code
val inputgsup
="hdfs://"+sparkmasterip+"/user/sense/datasets/gsup/binary/30/2014/11/0[1-9]/part*";
val gsupfile =
sc.newAPIHadoopFile[BytesWritable,Byte