Hi Shiao-An Yuan I also found this correctness problem in my production environment. My spark version is 2.3.1。 I thought it was because Spark-23243 before . But you said You also have this problem in your environment , and your version is 2.4.4 which had solved spark-23243. So Maybe this problem is not because SPARK-23243. As you said ,if it was caused by ‘first’ before ‘repartition’, then how to solve this problem fundamentally. And is there any workaround?
> 2021年1月18日 上午10:35,Shiao-An Yuan <shiao.an.y...@gmail.com> 写道: > > Hi, > I am using Spark 2.4.4 standalone mode. > > On Mon, Jan 18, 2021 at 4:26 AM Sean Owen <sro...@gmail.com > <mailto:sro...@gmail.com>> wrote: > Hm, FWIW I can't reproduce that on Spark 3.0.1. What version are you using? > > On Sun, Jan 17, 2021 at 6:22 AM Shiao-An Yuan <shiao.an.y...@gmail.com > <mailto:shiao.an.y...@gmail.com>> wrote: > Hi folks, > > I finally found the root cause of this issue. > It can be easily reproduced by the following code. > We ran it on a standalone mode 4 cores * 4 instances (total 16 cores) > environment. > > ``` > import org.apache.spark.TaskContext > import scala.sys.process._ > import org.apache.spark.sql.functions._ > import com.google.common.hash.Hashing > val murmur3 = Hashing.murmur3_32() > > // create a Dataset with the cardinality of the second element equals 50000. > val ds = spark.range(0, 100000, 1, 130).map(i => > (murmur3.hashLong(i).asInt(), i/2)) > > ds.groupByKey(_._2) > .agg(first($"_1").as[Long]) > .repartition(200) > .map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == > 100 && TaskContext.get.stageAttemptNumber == 0) { > throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!) > } > x > } > .map(_._2).distinct().count() // the correct result is 50000, but we > always got fewer number > ``` > > The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning > always generate the same distribution, > but the UDAF `first` may return non-deterministic results and caused the > sorting result non-deterministic. > Therefore, the first stage and the retry stage might have different > distribution and cause duplications and loss. > > Thanks, > Shiao-An Yuan > > On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan <shiao.an.y...@gmail.com > <mailto:shiao.an.y...@gmail.com>> wrote: > Hi folks, > > We recently identified a data correctness issue in our pipeline. > > The data processing flow is as follows: > 1. read the current snapshot (provide empty if it doesn't exist yet) > 2. read unprocessed new data > 3. union them and do a `reduceByKey` operation > 4. output a new version of the snapshot > 5. repeat step 1~4 > > The simplified version of code: > ``` > // schema > case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */) > > // function for reduce > def merge(left: Log, right: Log): Log = { > Log(pkey = left.pkey > a = if (left.a!=null) left.a else right.a, > b = if (left.a!=null) left.b else right.b, > ... > ) > } > > // a very large parquet file (>10G, 200 partitions) > val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log] > > // multiple small parquet files > val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log] > > val newSnapshot = currentSnapshot.union(newAddedLog) > .groupByKey(new String(pkey)) // generate key > .reduceGroups(_.merge(_)) // > spark.sql.shuffle.partitions=200 > .map(_._2) // drop key > > newSnapshot > .repartition(60) // (1) > .write.parquet(newPath) > ``` > > The issue we have is that some data were duplicated or lost, and the amount of > duplicated and loss data are similar. > > We also noticed that this situation only happens if some instances got > preempted. Spark will retry the stage, so some of the partitioned files are > generated at the 1st time, and other files are generated at the 2nd(retry) > time. > Moreover, those duplicated logs will be duplicated exactly twice and located > in > both batches (one in the first batch; and one in the second batch). > > The input/output files are parquet on GCS. The Spark version is 2.4.4 with > standalone deployment. Workers running on GCP preemptible instances and they > being preempted very frequently. > > The pipeline is running in a single long-running process with multi-threads, > each snapshot represent an "hour" of data, and we do the "read-reduce-write" > operations > on multiple snapshots(hours) simultaneously. We pretty sure the same > snapshot(hour) never process parallelly and the output path always > generated with a timestamp, so those jobs shouldn't affect each other. > > After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` the > issue > was gone, but I believe there is still a correctness bug that hasn't been > reported yet. > > We have tried to reproduce this bug on a smaller scale but haven't succeeded > yet. I > have read SPARK-23207 and SPARK-28699, but couldn't found the bug. > Since this case is DataSet, I believe it is unrelated to SPARK-24243. > > Can anyone give me some advice about the following tasks? > Thanks in advance. > > Shiao-An Yuan