Hi, I may be wrong, but this looks like a massively complicated solution for what could have been a simple SQL.
It always seems okay to be to first reduce the complexity and then solve it, rather than solve a problem which should not even exist in the first instance. Regards, Gourav On Sun, Jan 17, 2021 at 12:22 PM Shiao-An Yuan <shiao.an.y...@gmail.com> wrote: > Hi folks, > > I finally found the root cause of this issue. > It can be easily reproduced by the following code. > We ran it on a standalone mode 4 cores * 4 instances (total 16 cores) > environment. > > ``` > import org.apache.spark.TaskContext > import scala.sys.process._ > import org.apache.spark.sql.functions._ > import com.google.common.hash.Hashing > val murmur3 = Hashing.murmur3_32() > > // create a Dataset with the cardinality of the second element equals > 50000. > val ds = spark.range(0, 100000, 1, 130).map(i => > (murmur3.hashLong(i).asInt(), i/2)) > > ds.groupByKey(_._2) > .agg(first($"_1").as[Long]) > .repartition(200) > .map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > == 100 && TaskContext.get.stageAttemptNumber == 0) { > throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!) > } > x > } > .map(_._2).distinct().count() // the correct result is 50000, but we > always got fewer number > ``` > > The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning > always generate the same distribution, > but the UDAF `first` may return non-deterministic results and caused the > sorting result non-deterministic. > Therefore, the first stage and the retry stage might have different > distribution and cause duplications and loss. > > Thanks, > Shiao-An Yuan > > On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan <shiao.an.y...@gmail.com> > wrote: > >> Hi folks, >> >> We recently identified a data correctness issue in our pipeline. >> >> The data processing flow is as follows: >> 1. read the current snapshot (provide empty if it doesn't exist yet) >> 2. read unprocessed new data >> 3. union them and do a `reduceByKey` operation >> 4. output a new version of the snapshot >> 5. repeat step 1~4 >> >> The simplified version of code: >> ``` >> // schema >> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */) >> >> // function for reduce >> def merge(left: Log, right: Log): Log = { >> Log(pkey = left.pkey >> a = if (left.a!=null) left.a else right.a, >> b = if (left.a!=null) left.b else right.b, >> ... >> ) >> } >> >> // a very large parquet file (>10G, 200 partitions) >> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log] >> >> // multiple small parquet files >> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log] >> >> val newSnapshot = currentSnapshot.union(newAddedLog) >> .groupByKey(new String(pkey)) // generate key >> .reduceGroups(_.merge(_)) // >> spark.sql.shuffle.partitions=200 >> .map(_._2) // drop key >> >> newSnapshot >> .repartition(60) // (1) >> .write.parquet(newPath) >> ``` >> >> The issue we have is that some data were duplicated or lost, and the >> amount of >> duplicated and loss data are similar. >> >> We also noticed that this situation only happens if some instances got >> preempted. Spark will retry the stage, so some of the partitioned files >> are >> generated at the 1st time, and other files are generated at the >> 2nd(retry) time. >> Moreover, those duplicated logs will be duplicated exactly twice and >> located in >> both batches (one in the first batch; and one in the second batch). >> >> The input/output files are parquet on GCS. The Spark version is 2.4.4 with >> standalone deployment. Workers running on GCP preemptible instances and >> they >> being preempted very frequently. >> >> The pipeline is running in a single long-running process with >> multi-threads, >> each snapshot represent an "hour" of data, and we do the >> "read-reduce-write" operations >> on multiple snapshots(hours) simultaneously. We pretty sure the same >> snapshot(hour) never process parallelly and the output path always >> generated with a timestamp, so those jobs shouldn't affect each other. >> >> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` >> the issue >> was gone, but I believe there is still a correctness bug that hasn't been >> reported yet. >> >> We have tried to reproduce this bug on a smaller scale but haven't >> succeeded yet. I >> have read SPARK-23207 and SPARK-28699, but couldn't found the bug. >> Since this case is DataSet, I believe it is unrelated to SPARK-24243. >> >> Can anyone give me some advice about the following tasks? >> Thanks in advance. >> >> Shiao-An Yuan >> >