Hi, I am using Spark 2.4.4 standalone mode. On Mon, Jan 18, 2021 at 4:26 AM Sean Owen <sro...@gmail.com> wrote:
> Hm, FWIW I can't reproduce that on Spark 3.0.1. What version are you using? > > On Sun, Jan 17, 2021 at 6:22 AM Shiao-An Yuan <shiao.an.y...@gmail.com> > wrote: > >> Hi folks, >> >> I finally found the root cause of this issue. >> It can be easily reproduced by the following code. >> We ran it on a standalone mode 4 cores * 4 instances (total 16 cores) >> environment. >> >> ``` >> import org.apache.spark.TaskContext >> import scala.sys.process._ >> import org.apache.spark.sql.functions._ >> import com.google.common.hash.Hashing >> val murmur3 = Hashing.murmur3_32() >> >> // create a Dataset with the cardinality of the second element equals >> 50000. >> val ds = spark.range(0, 100000, 1, 130).map(i => >> (murmur3.hashLong(i).asInt(), i/2)) >> >> ds.groupByKey(_._2) >> .agg(first($"_1").as[Long]) >> .repartition(200) >> .map { x => >> if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId >> == 100 && TaskContext.get.stageAttemptNumber == 0) { >> throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!) >> } >> x >> } >> .map(_._2).distinct().count() // the correct result is 50000, but we >> always got fewer number >> ``` >> >> The problem here is SPARK-23207 use sorting to make >> RoundRobinPartitioning always generate the same distribution, >> but the UDAF `first` may return non-deterministic results and caused the >> sorting result non-deterministic. >> Therefore, the first stage and the retry stage might have different >> distribution and cause duplications and loss. >> >> Thanks, >> Shiao-An Yuan >> >> On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan <shiao.an.y...@gmail.com> >> wrote: >> >>> Hi folks, >>> >>> We recently identified a data correctness issue in our pipeline. >>> >>> The data processing flow is as follows: >>> 1. read the current snapshot (provide empty if it doesn't exist yet) >>> 2. read unprocessed new data >>> 3. union them and do a `reduceByKey` operation >>> 4. output a new version of the snapshot >>> 5. repeat step 1~4 >>> >>> The simplified version of code: >>> ``` >>> // schema >>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */) >>> >>> // function for reduce >>> def merge(left: Log, right: Log): Log = { >>> Log(pkey = left.pkey >>> a = if (left.a!=null) left.a else right.a, >>> b = if (left.a!=null) left.b else right.b, >>> ... >>> ) >>> } >>> >>> // a very large parquet file (>10G, 200 partitions) >>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log] >>> >>> // multiple small parquet files >>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log] >>> >>> val newSnapshot = currentSnapshot.union(newAddedLog) >>> .groupByKey(new String(pkey)) // generate key >>> .reduceGroups(_.merge(_)) // >>> spark.sql.shuffle.partitions=200 >>> .map(_._2) // drop key >>> >>> newSnapshot >>> .repartition(60) // (1) >>> .write.parquet(newPath) >>> ``` >>> >>> The issue we have is that some data were duplicated or lost, and the >>> amount of >>> duplicated and loss data are similar. >>> >>> We also noticed that this situation only happens if some instances got >>> preempted. Spark will retry the stage, so some of the partitioned files >>> are >>> generated at the 1st time, and other files are generated at the >>> 2nd(retry) time. >>> Moreover, those duplicated logs will be duplicated exactly twice and >>> located in >>> both batches (one in the first batch; and one in the second batch). >>> >>> The input/output files are parquet on GCS. The Spark version is 2.4.4 >>> with >>> standalone deployment. Workers running on GCP preemptible instances and >>> they >>> being preempted very frequently. >>> >>> The pipeline is running in a single long-running process with >>> multi-threads, >>> each snapshot represent an "hour" of data, and we do the >>> "read-reduce-write" operations >>> on multiple snapshots(hours) simultaneously. We pretty sure the same >>> snapshot(hour) never process parallelly and the output path always >>> generated with a timestamp, so those jobs shouldn't affect each other. >>> >>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` >>> the issue >>> was gone, but I believe there is still a correctness bug that hasn't >>> been reported yet. >>> >>> We have tried to reproduce this bug on a smaller scale but haven't >>> succeeded yet. I >>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug. >>> Since this case is DataSet, I believe it is unrelated to SPARK-24243. >>> >>> Can anyone give me some advice about the following tasks? >>> Thanks in advance. >>> >>> Shiao-An Yuan >>> >>