Hi folks,
I finally found the root cause of this issue.
It can be easily reproduced by the following code.
We ran it on a standalone mode 4 cores * 4 instances (total 16 cores)
environment.
```
import org.apache.spark.TaskContext
import scala.sys.process._
import org.apache.spark.sql.functions._
import com.google.common.hash.Hashing
val murmur3 = Hashing.murmur3_32()
// create a Dataset with the cardinality of the second element equals 50000.
val ds = spark.range(0, 100000, 1, 130).map(i =>
(murmur3.hashLong(i).asInt(), i/2))
ds.groupByKey(_._2)
.agg(first($"_1").as[Long])
.repartition(200)
.map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
== 100 && TaskContext.get.stageAttemptNumber == 0) {
throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
}
x
}
.map(_._2).distinct().count() // the correct result is 50000, but we
always got fewer number
```
The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning
always generate the same distribution,
but the UDAF `first` may return non-deterministic results and caused the
sorting result non-deterministic.
Therefore, the first stage and the retry stage might have different
distribution and cause duplications and loss.
Thanks,
Shiao-An Yuan
On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan <[email protected]>
wrote:
> Hi folks,
>
> We recently identified a data correctness issue in our pipeline.
>
> The data processing flow is as follows:
> 1. read the current snapshot (provide empty if it doesn't exist yet)
> 2. read unprocessed new data
> 3. union them and do a `reduceByKey` operation
> 4. output a new version of the snapshot
> 5. repeat step 1~4
>
> The simplified version of code:
> ```
> // schema
> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>
> // function for reduce
> def merge(left: Log, right: Log): Log = {
> Log(pkey = left.pkey
> a = if (left.a!=null) left.a else right.a,
> b = if (left.a!=null) left.b else right.b,
> ...
> )
> }
>
> // a very large parquet file (>10G, 200 partitions)
> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>
> // multiple small parquet files
> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>
> val newSnapshot = currentSnapshot.union(newAddedLog)
> .groupByKey(new String(pkey)) // generate key
> .reduceGroups(_.merge(_)) //
> spark.sql.shuffle.partitions=200
> .map(_._2) // drop key
>
> newSnapshot
> .repartition(60) // (1)
> .write.parquet(newPath)
> ```
>
> The issue we have is that some data were duplicated or lost, and the
> amount of
> duplicated and loss data are similar.
>
> We also noticed that this situation only happens if some instances got
> preempted. Spark will retry the stage, so some of the partitioned files are
> generated at the 1st time, and other files are generated at the 2nd(retry)
> time.
> Moreover, those duplicated logs will be duplicated exactly twice and
> located in
> both batches (one in the first batch; and one in the second batch).
>
> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
> standalone deployment. Workers running on GCP preemptible instances and
> they
> being preempted very frequently.
>
> The pipeline is running in a single long-running process with
> multi-threads,
> each snapshot represent an "hour" of data, and we do the
> "read-reduce-write" operations
> on multiple snapshots(hours) simultaneously. We pretty sure the same
> snapshot(hour) never process parallelly and the output path always
> generated with a timestamp, so those jobs shouldn't affect each other.
>
> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
> the issue
> was gone, but I believe there is still a correctness bug that hasn't been
> reported yet.
>
> We have tried to reproduce this bug on a smaller scale but haven't
> succeeded yet. I
> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>
> Can anyone give me some advice about the following tasks?
> Thanks in advance.
>
> Shiao-An Yuan
>