Hi Sean,

Sorry, I didn't describe it clearly. The column "pkey" is like a "Primary
Key" and I do "reduce by key" on this column, so the "amount of rows"
should always equal to the "cardinality of pkey".
When I said data get duplicated & lost, I mean duplicated "pkey" exists in
the output file (after "reduce by key") and some "pkey" missing.
Since it only happens when executors being preempted, I believe this is a
bug (nondeterministic shuffle) that SPARK-23207 trying to solve.

Thanks,

Shiao-An Yuan

On Tue, Dec 29, 2020 at 10:53 PM Sean Owen <sro...@gmail.com> wrote:

> Total guess here, but your key is a case class. It does define hashCode
> and equals for you, but, you have an array as one of the members. Array
> equality is by reference, so, two arrays of the same elements are not
> equal. You may have to define hashCode and equals manually to make them
> correct.
>
> On Tue, Dec 29, 2020 at 8:01 AM Shiao-An Yuan <shiao.an.y...@gmail.com>
> wrote:
>
>> Hi folks,
>>
>> We recently identified a data correctness issue in our pipeline.
>>
>> The data processing flow is as follows:
>> 1. read the current snapshot (provide empty if it doesn't exist yet)
>> 2. read unprocessed new data
>> 3. union them and do a `reduceByKey` operation
>> 4. output a new version of the snapshot
>> 5. repeat step 1~4
>>
>> The simplified version of code:
>> ```
>> // schema
>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>>
>> // function for reduce
>> def merge(left: Log, right: Log): Log = {
>>   Log(pkey = left.pkey
>>       a    = if (left.a!=null) left.a else right.a,
>>       b    = if (left.a!=null) left.b else right.b,
>>       ...
>>   )
>> }
>>
>> // a very large parquet file (>10G, 200 partitions)
>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>
>> // multiple small parquet files
>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>
>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>   .groupByKey(new String(pkey))                  // generate key
>>   .reduceGroups(_.merge(_))                        //
>> spark.sql.shuffle.partitions=200
>>   .map(_._2)                                     // drop key
>>
>> newSnapshot
>>   .repartition(60)                              // (1)
>>   .write.parquet(newPath)
>> ```
>>
>> The issue we have is that some data were duplicated or lost, and the
>> amount of
>> duplicated and loss data are similar.
>>
>> We also noticed that this situation only happens if some instances got
>> preempted. Spark will retry the stage, so some of the partitioned files
>> are
>> generated at the 1st time, and other files are generated at the
>> 2nd(retry) time.
>> Moreover, those duplicated logs will be duplicated exactly twice and
>> located in
>> both batches (one in the first batch; and one in the second batch).
>>
>> The input/output files are parquet on GCS. The Spark version is 2.4.4 with
>> standalone deployment. Workers running on GCP preemptible instances and
>> they
>> being preempted very frequently.
>>
>> The pipeline is running in a single long-running process with
>> multi-threads,
>> each snapshot represent an "hour" of data, and we do the
>> "read-reduce-write" operations
>> on multiple snapshots(hours) simultaneously. We pretty sure the same
>> snapshot(hour) never process parallelly and the output path always
>> generated with a timestamp, so those jobs shouldn't affect each other.
>>
>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
>> the issue
>> was gone, but I believe there is still a correctness bug that hasn't been
>> reported yet.
>>
>> We have tried to reproduce this bug on a smaller scale but haven't
>> succeeded yet. I
>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
>> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>>
>> Can anyone give me some advice about the following tasks?
>> Thanks in advance.
>>
>> Shiao-An Yuan
>>
>

Reply via email to