Hi folks, I finally found the root cause of this issue. It can be easily reproduced by the following code. We ran it on a standalone mode 4 cores * 4 instances (total 16 cores) environment.
``` import org.apache.spark.TaskContext import scala.sys.process._ import org.apache.spark.sql.functions._ import com.google.common.hash.Hashing val murmur3 = Hashing.murmur3_32() // create a Dataset with the cardinality of the second element equals 50000. val ds = spark.range(0, 100000, 1, 130).map(i => (murmur3.hashLong(i).asInt(), i/2)) ds.groupByKey(_._2) .agg(first($"_1").as[Long]) .repartition(200) .map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 100 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!) } x } .map(_._2).distinct().count() // the correct result is 50000, but we always got fewer number ``` The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning always generate the same distribution, but the UDAF `first` may return non-deterministic results and caused the sorting result non-deterministic. Therefore, the first stage and the retry stage might have different distribution and cause duplications and loss. Thanks, Shiao-An Yuan On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan <shiao.an.y...@gmail.com> wrote: > Hi folks, > > We recently identified a data correctness issue in our pipeline. > > The data processing flow is as follows: > 1. read the current snapshot (provide empty if it doesn't exist yet) > 2. read unprocessed new data > 3. union them and do a `reduceByKey` operation > 4. output a new version of the snapshot > 5. repeat step 1~4 > > The simplified version of code: > ``` > // schema > case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */) > > // function for reduce > def merge(left: Log, right: Log): Log = { > Log(pkey = left.pkey > a = if (left.a!=null) left.a else right.a, > b = if (left.a!=null) left.b else right.b, > ... > ) > } > > // a very large parquet file (>10G, 200 partitions) > val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log] > > // multiple small parquet files > val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log] > > val newSnapshot = currentSnapshot.union(newAddedLog) > .groupByKey(new String(pkey)) // generate key > .reduceGroups(_.merge(_)) // > spark.sql.shuffle.partitions=200 > .map(_._2) // drop key > > newSnapshot > .repartition(60) // (1) > .write.parquet(newPath) > ``` > > The issue we have is that some data were duplicated or lost, and the > amount of > duplicated and loss data are similar. > > We also noticed that this situation only happens if some instances got > preempted. Spark will retry the stage, so some of the partitioned files are > generated at the 1st time, and other files are generated at the 2nd(retry) > time. > Moreover, those duplicated logs will be duplicated exactly twice and > located in > both batches (one in the first batch; and one in the second batch). > > The input/output files are parquet on GCS. The Spark version is 2.4.4 with > standalone deployment. Workers running on GCP preemptible instances and > they > being preempted very frequently. > > The pipeline is running in a single long-running process with > multi-threads, > each snapshot represent an "hour" of data, and we do the > "read-reduce-write" operations > on multiple snapshots(hours) simultaneously. We pretty sure the same > snapshot(hour) never process parallelly and the output path always > generated with a timestamp, so those jobs shouldn't affect each other. > > After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` > the issue > was gone, but I believe there is still a correctness bug that hasn't been > reported yet. > > We have tried to reproduce this bug on a smaller scale but haven't > succeeded yet. I > have read SPARK-23207 and SPARK-28699, but couldn't found the bug. > Since this case is DataSet, I believe it is unrelated to SPARK-24243. > > Can anyone give me some advice about the following tasks? > Thanks in advance. > > Shiao-An Yuan >