Hi Sean, Sorry, I didn't describe it clearly. The column "pkey" is like a "Primary Key" and I do "reduce by key" on this column, so the "amount of rows" should always equal to the "cardinality of pkey". When I said data get duplicated & lost, I mean duplicated "pkey" exists in the output file (after "reduce by key") and some "pkey" missing. Since it only happens when executors being preempted, I believe this is a bug (nondeterministic shuffle) that SPARK-23207 trying to solve.
Thanks, Shiao-An Yuan On Tue, Dec 29, 2020 at 10:53 PM Sean Owen <sro...@gmail.com> wrote: > Total guess here, but your key is a case class. It does define hashCode > and equals for you, but, you have an array as one of the members. Array > equality is by reference, so, two arrays of the same elements are not > equal. You may have to define hashCode and equals manually to make them > correct. > > On Tue, Dec 29, 2020 at 8:01 AM Shiao-An Yuan <shiao.an.y...@gmail.com> > wrote: > >> Hi folks, >> >> We recently identified a data correctness issue in our pipeline. >> >> The data processing flow is as follows: >> 1. read the current snapshot (provide empty if it doesn't exist yet) >> 2. read unprocessed new data >> 3. union them and do a `reduceByKey` operation >> 4. output a new version of the snapshot >> 5. repeat step 1~4 >> >> The simplified version of code: >> ``` >> // schema >> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */) >> >> // function for reduce >> def merge(left: Log, right: Log): Log = { >> Log(pkey = left.pkey >> a = if (left.a!=null) left.a else right.a, >> b = if (left.a!=null) left.b else right.b, >> ... >> ) >> } >> >> // a very large parquet file (>10G, 200 partitions) >> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log] >> >> // multiple small parquet files >> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log] >> >> val newSnapshot = currentSnapshot.union(newAddedLog) >> .groupByKey(new String(pkey)) // generate key >> .reduceGroups(_.merge(_)) // >> spark.sql.shuffle.partitions=200 >> .map(_._2) // drop key >> >> newSnapshot >> .repartition(60) // (1) >> .write.parquet(newPath) >> ``` >> >> The issue we have is that some data were duplicated or lost, and the >> amount of >> duplicated and loss data are similar. >> >> We also noticed that this situation only happens if some instances got >> preempted. Spark will retry the stage, so some of the partitioned files >> are >> generated at the 1st time, and other files are generated at the >> 2nd(retry) time. >> Moreover, those duplicated logs will be duplicated exactly twice and >> located in >> both batches (one in the first batch; and one in the second batch). >> >> The input/output files are parquet on GCS. The Spark version is 2.4.4 with >> standalone deployment. Workers running on GCP preemptible instances and >> they >> being preempted very frequently. >> >> The pipeline is running in a single long-running process with >> multi-threads, >> each snapshot represent an "hour" of data, and we do the >> "read-reduce-write" operations >> on multiple snapshots(hours) simultaneously. We pretty sure the same >> snapshot(hour) never process parallelly and the output path always >> generated with a timestamp, so those jobs shouldn't affect each other. >> >> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` >> the issue >> was gone, but I believe there is still a correctness bug that hasn't been >> reported yet. >> >> We have tried to reproduce this bug on a smaller scale but haven't >> succeeded yet. I >> have read SPARK-23207 and SPARK-28699, but couldn't found the bug. >> Since this case is DataSet, I believe it is unrelated to SPARK-24243. >> >> Can anyone give me some advice about the following tasks? >> Thanks in advance. >> >> Shiao-An Yuan >> >