Hi folks,

I finally found the root cause of this issue.
It can be easily reproduced by the following code.
We ran it on a standalone mode 4 cores * 4 instances (total 16 cores)
environment.

```
import org.apache.spark.TaskContext
import scala.sys.process._
import org.apache.spark.sql.functions._
import com.google.common.hash.Hashing
val murmur3 = Hashing.murmur3_32()

// create a Dataset with the cardinality of the second element equals 50000.
val ds = spark.range(0, 100000, 1, 130).map(i =>
(murmur3.hashLong(i).asInt(), i/2))

ds.groupByKey(_._2)
  .agg(first($"_1").as[Long])
  .repartition(200)
  .map { x =>
    if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
== 100 && TaskContext.get.stageAttemptNumber == 0) {
      throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
    }
    x
  }
  .map(_._2).distinct().count()   // the correct result is 50000, but we
always got fewer number
```

The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning
always generate the same distribution,
but the UDAF `first` may return non-deterministic results and caused the
sorting result non-deterministic.
Therefore, the first stage and the retry stage might have different
distribution and cause duplications and loss.

Thanks,
Shiao-An Yuan

On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan <shiao.an.y...@gmail.com>
wrote:

> Hi folks,
>
> We recently identified a data correctness issue in our pipeline.
>
> The data processing flow is as follows:
> 1. read the current snapshot (provide empty if it doesn't exist yet)
> 2. read unprocessed new data
> 3. union them and do a `reduceByKey` operation
> 4. output a new version of the snapshot
> 5. repeat step 1~4
>
> The simplified version of code:
> ```
> // schema
> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>
> // function for reduce
> def merge(left: Log, right: Log): Log = {
>   Log(pkey = left.pkey
>       a    = if (left.a!=null) left.a else right.a,
>       b    = if (left.a!=null) left.b else right.b,
>       ...
>   )
> }
>
> // a very large parquet file (>10G, 200 partitions)
> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>
> // multiple small parquet files
> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>
> val newSnapshot = currentSnapshot.union(newAddedLog)
>   .groupByKey(new String(pkey))                  // generate key
>   .reduceGroups(_.merge(_))                        //
> spark.sql.shuffle.partitions=200
>   .map(_._2)                                     // drop key
>
> newSnapshot
>   .repartition(60)                              // (1)
>   .write.parquet(newPath)
> ```
>
> The issue we have is that some data were duplicated or lost, and the
> amount of
> duplicated and loss data are similar.
>
> We also noticed that this situation only happens if some instances got
> preempted. Spark will retry the stage, so some of the partitioned files are
> generated at the 1st time, and other files are generated at the 2nd(retry)
> time.
> Moreover, those duplicated logs will be duplicated exactly twice and
> located in
> both batches (one in the first batch; and one in the second batch).
>
> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
> standalone deployment. Workers running on GCP preemptible instances and
> they
> being preempted very frequently.
>
> The pipeline is running in a single long-running process with
> multi-threads,
> each snapshot represent an "hour" of data, and we do the
> "read-reduce-write" operations
> on multiple snapshots(hours) simultaneously. We pretty sure the same
> snapshot(hour) never process parallelly and the output path always
> generated with a timestamp, so those jobs shouldn't affect each other.
>
> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
> the issue
> was gone, but I believe there is still a correctness bug that hasn't been
> reported yet.
>
> We have tried to reproduce this bug on a smaller scale but haven't
> succeeded yet. I
> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>
> Can anyone give me some advice about the following tasks?
> Thanks in advance.
>
> Shiao-An Yuan
>

Reply via email to