Hi,
I am using Spark 2.4.4 standalone mode.

On Mon, Jan 18, 2021 at 4:26 AM Sean Owen <sro...@gmail.com> wrote:

> Hm, FWIW I can't reproduce that on Spark 3.0.1. What version are you using?
>
> On Sun, Jan 17, 2021 at 6:22 AM Shiao-An Yuan <shiao.an.y...@gmail.com>
> wrote:
>
>> Hi folks,
>>
>> I finally found the root cause of this issue.
>> It can be easily reproduced by the following code.
>> We ran it on a standalone mode 4 cores * 4 instances (total 16 cores)
>> environment.
>>
>> ```
>> import org.apache.spark.TaskContext
>> import scala.sys.process._
>> import org.apache.spark.sql.functions._
>> import com.google.common.hash.Hashing
>> val murmur3 = Hashing.murmur3_32()
>>
>> // create a Dataset with the cardinality of the second element equals
>> 50000.
>> val ds = spark.range(0, 100000, 1, 130).map(i =>
>> (murmur3.hashLong(i).asInt(), i/2))
>>
>> ds.groupByKey(_._2)
>>   .agg(first($"_1").as[Long])
>>   .repartition(200)
>>   .map { x =>
>>     if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId
>> == 100 && TaskContext.get.stageAttemptNumber == 0) {
>>       throw new Exception("pkill -f CoarseGrainedExecutorBackend".!!)
>>     }
>>     x
>>   }
>>   .map(_._2).distinct().count()   // the correct result is 50000, but we
>> always got fewer number
>> ```
>>
>> The problem here is SPARK-23207 use sorting to make
>> RoundRobinPartitioning always generate the same distribution,
>> but the UDAF `first` may return non-deterministic results and caused the
>> sorting result non-deterministic.
>> Therefore, the first stage and the retry stage might have different
>> distribution and cause duplications and loss.
>>
>> Thanks,
>> Shiao-An Yuan
>>
>> On Tue, Dec 29, 2020 at 10:00 PM Shiao-An Yuan <shiao.an.y...@gmail.com>
>> wrote:
>>
>>> Hi folks,
>>>
>>> We recently identified a data correctness issue in our pipeline.
>>>
>>> The data processing flow is as follows:
>>> 1. read the current snapshot (provide empty if it doesn't exist yet)
>>> 2. read unprocessed new data
>>> 3. union them and do a `reduceByKey` operation
>>> 4. output a new version of the snapshot
>>> 5. repeat step 1~4
>>>
>>> The simplified version of code:
>>> ```
>>> // schema
>>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>>>
>>> // function for reduce
>>> def merge(left: Log, right: Log): Log = {
>>>   Log(pkey = left.pkey
>>>       a    = if (left.a!=null) left.a else right.a,
>>>       b    = if (left.a!=null) left.b else right.b,
>>>       ...
>>>   )
>>> }
>>>
>>> // a very large parquet file (>10G, 200 partitions)
>>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>>
>>> // multiple small parquet files
>>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>>
>>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>>   .groupByKey(new String(pkey))                  // generate key
>>>   .reduceGroups(_.merge(_))                        //
>>> spark.sql.shuffle.partitions=200
>>>   .map(_._2)                                     // drop key
>>>
>>> newSnapshot
>>>   .repartition(60)                              // (1)
>>>   .write.parquet(newPath)
>>> ```
>>>
>>> The issue we have is that some data were duplicated or lost, and the
>>> amount of
>>> duplicated and loss data are similar.
>>>
>>> We also noticed that this situation only happens if some instances got
>>> preempted. Spark will retry the stage, so some of the partitioned files
>>> are
>>> generated at the 1st time, and other files are generated at the
>>> 2nd(retry) time.
>>> Moreover, those duplicated logs will be duplicated exactly twice and
>>> located in
>>> both batches (one in the first batch; and one in the second batch).
>>>
>>> The input/output files are parquet on GCS. The Spark version is 2.4.4
>>> with
>>> standalone deployment. Workers running on GCP preemptible instances and
>>> they
>>> being preempted very frequently.
>>>
>>> The pipeline is running in a single long-running process with
>>> multi-threads,
>>> each snapshot represent an "hour" of data, and we do the
>>> "read-reduce-write" operations
>>> on multiple snapshots(hours) simultaneously. We pretty sure the same
>>> snapshot(hour) never process parallelly and the output path always
>>> generated with a timestamp, so those jobs shouldn't affect each other.
>>>
>>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
>>> the issue
>>> was gone, but I believe there is still a correctness bug that hasn't
>>> been reported yet.
>>>
>>> We have tried to reproduce this bug on a smaller scale but haven't
>>> succeeded yet. I
>>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
>>> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>>>
>>> Can anyone give me some advice about the following tasks?
>>> Thanks in advance.
>>>
>>> Shiao-An Yuan
>>>
>>

Reply via email to