Hi Shiao-An Yuan
I also found this correctness problem in my production environment.
My spark version is 2.3.1。 I thought it was because Spark-23243 before .
But you said You also have this problem in your environment
, and your version is 2.4.4 which had solved spark-23243. So Maybe this problem 
is not because SPARK-23243.
 
As you said ,if it was caused by ‘first’ before ‘repartition’, then how to 
solve this problem fundamentally. And is there any workaround?


> 2021年1月18日 上午10:35,Shiao-An Yuan <shiao.an.y...@gmail.com> 写道:
> 
> Hi, 
> I am using Spark 2.4.4 standalone mode.
> 
> On Mon, Jan 18, 2021 at 4:26 AM Sean Owen <sro...@gmail.com 
> <mailto:sro...@gmail.com>> wrote:
> Hm, FWIW I can't reproduce that on Spark 3.0.1. What version are you using?
> 
> On Sun, Jan 17, 2021 at 6:22 AM Shiao-An Yuan <shiao.an.y...@gmail.com 
> <mailto:shiao.an.y...@gmail.com>> wrote:
> Hi folks,
> 
> I finally found the root cause of this issue.
> It can be easily reproduced by the following code.
> We ran it on a standalone mode 4 cores * 4 instances (total 16 cores) 
> environment.
> 
> ```
> import org.apache.spark.TaskContext
> import scala.sys.process._
> import org.apache.spark.sql.functions._
> import com.google.common.hash.Hashing
> val murmur3 = Hashing.murmur3_32()
> 
> // create a Dataset with the cardinality of the second element equals 50000.
> val ds = spark.range(0, 100000, 1, 130).map(i => 
> (murmur3.hashLong(i).asInt(), i/2))
> 
> ds.groupByKey(_._2)
>   .agg(first($"_1").as[Long])
>   .repartition(200)
>   .map { x =>
>     if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 
> 100 && TaskContext.get.stageAttemptNumber == 0) {
>       throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
>     }
>     x
>   }
>   .map(_._2).distinct().count()   // the correct result is 50000, but we 
> always got fewer number
> ```
> 
> The problem here is SPARK-23207 use sorting to make RoundRobinPartitioning 
> always generate the same distribution,
> but the UDAF `first` may return non-deterministic results and caused the 
> sorting result non-deterministic.
> Therefore, the first stage and the retry stage might have different 
> distribution and cause duplications and loss.
> 
> Thanks,
> Shiao-An Yuan
> 
> On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan <shiao.an.y...@gmail.com 
> <mailto:shiao.an.y...@gmail.com>> wrote:
> Hi folks,
> 
> We recently identified a data correctness issue in our pipeline.
> 
> The data processing flow is as follows:
> 1. read the current snapshot (provide empty if it doesn't exist yet)
> 2. read unprocessed new data
> 3. union them and do a `reduceByKey` operation
> 4. output a new version of the snapshot
> 5. repeat step 1~4
> 
> The simplified version of code:
> ```
> // schema
> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
> 
> // function for reduce
> def merge(left: Log, right: Log): Log = {
>   Log(pkey = left.pkey
>       a    = if (left.a!=null) left.a else right.a,
>       b    = if (left.a!=null) left.b else right.b,
>       ...
>   )
> }
> 
> // a very large parquet file (>10G, 200 partitions)
> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]   
> 
> // multiple small parquet files
> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
> 
> val newSnapshot = currentSnapshot.union(newAddedLog)
>   .groupByKey(new String(pkey))                  // generate key
>   .reduceGroups(_.merge(_))                        // 
> spark.sql.shuffle.partitions=200
>   .map(_._2)                                     // drop key
> 
> newSnapshot
>   .repartition(60)                              // (1)
>   .write.parquet(newPath)
> ```
> 
> The issue we have is that some data were duplicated or lost, and the amount of
> duplicated and loss data are similar.
> 
> We also noticed that this situation only happens if some instances got
> preempted. Spark will retry the stage, so some of the partitioned files are
> generated at the 1st time, and other files are generated at the 2nd(retry) 
> time.
> Moreover, those duplicated logs will be duplicated exactly twice and located 
> in
> both batches (one in the first batch; and one in the second batch).
> 
> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
> standalone deployment. Workers running on GCP preemptible instances and they
> being preempted very frequently.
> 
> The pipeline is running in a single long-running process with multi-threads,
> each snapshot represent an "hour" of data, and we do the "read-reduce-write" 
> operations
> on multiple snapshots(hours) simultaneously. We pretty sure the same
> snapshot(hour) never process parallelly and the output path always
> generated with a timestamp, so those jobs shouldn't affect each other.
> 
> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")` the 
> issue
> was gone, but I believe there is still a correctness bug that hasn't been 
> reported yet.
> 
> We have tried to reproduce this bug on a smaller scale but haven't succeeded 
> yet. I
> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
> 
> Can anyone give me some advice about the following tasks?
> Thanks in advance.
> 
> Shiao-An Yuan

Reply via email to