I don't think this addresses my comment at all. Please try correctly implementing equals and hashCode for your key class first.
On Tue, Dec 29, 2020 at 8:31 PM Shiao-An Yuan <shiao.an.y...@gmail.com> wrote: > Hi Sean, > > Sorry, I didn't describe it clearly. The column "pkey" is like a "Primary > Key" and I do "reduce by key" on this column, so the "amount of rows" > should always equal to the "cardinality of pkey". > When I said data get duplicated & lost, I mean duplicated "pkey" exists in > the output file (after "reduce by key") and some "pkey" missing. > Since it only happens when executors being preempted, I believe this is a > bug (nondeterministic shuffle) that SPARK-23207 trying to solve. > > Thanks, > > Shiao-An Yuan > > On Tue, Dec 29, 2020 at 10:53 PM Sean Owen <sro...@gmail.com> wrote: > >> Total guess here, but your key is a case class. It does define hashCode >> and equals for you, but, you have an array as one of the members. Array >> equality is by reference, so, two arrays of the same elements are not >> equal. You may have to define hashCode and equals manually to make them >> correct. >> >> On Tue, Dec 29, 2020 at 8:01 AM Shiao-An Yuan <shiao.an.y...@gmail.com> >> wrote: >> >>> Hi folks, >>> >>> We recently identified a data correctness issue in our pipeline. >>> >>> The data processing flow is as follows: >>> 1. read the current snapshot (provide empty if it doesn't exist yet) >>> 2. read unprocessed new data >>> 3. union them and do a `reduceByKey` operation >>> 4. output a new version of the snapshot >>> 5. repeat step 1~4 >>> >>> The simplified version of code: >>> ``` >>> // schema >>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */) >>> >>> // function for reduce >>> def merge(left: Log, right: Log): Log = { >>> Log(pkey = left.pkey >>> a = if (left.a!=null) left.a else right.a, >>> b = if (left.a!=null) left.b else right.b, >>> ... >>> ) >>> } >>> >>> // a very large parquet file (>10G, 200 partitions) >>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log] >>> >>> // multiple small parquet files >>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log] >>> >>> val newSnapshot = currentSnapshot.union(newAddedLog) >>> .groupByKey(new String(pkey)) // generate key >>> .reduceGroups(_.merge(_)) // >>> spark.sql.shuffle.partitions=200 >>> .map(_._2) // drop key >>> >>> newSnapshot >>> .repartition(60) // (1) >>> .write.parquet(newPath) >>> ``` >>> >>> The issue we have is that some data were duplicated or lost, and the >>> amount of >>> duplicated and loss data are similar. >>> >>> We also noticed that this situation only happens if some instances got >>> preempted. Spark will retry the stage, so some of the partitioned files >>> are >>> generated at the 1st time, and other files are generated at the >>> 2nd(retry) time. >>> Moreover, those duplicated logs will be duplicated exactly twice and >>> located in >>> both batches (one in the first batch; and one in the second batch). >>> >>> The input/output files are parquet on GCS. The Spark version is 2.4.4 >>> with >>> standalone deployment. Workers running on GCP preemptible instances and >>> they >>> being preempted very frequently. >>> >>> The pipeline is running in a single long-running process with >>> multi-threads, >>> each snapshot represent an "hour" of data, and we do the >>> "read-reduce-write" operations >>> on multiple snapshots(hours) simultaneously. We pretty sure the same >>> snapshot(hour) never process parallelly and the output path always >>> generated with a timestamp, so those jobs shouldn't affect each other. >>> >>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` >>> the issue >>> was gone, but I believe there is still a correctness bug that hasn't >>> been reported yet. >>> >>> We have tried to reproduce this bug on a smaller scale but haven't >>> succeeded yet. I >>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug. >>> Since this case is DataSet, I believe it is unrelated to SPARK-24243. >>> >>> Can anyone give me some advice about the following tasks? >>> Thanks in advance. >>> >>> Shiao-An Yuan >>> >>