I don't think this addresses my comment at all. Please try correctly
implementing equals and hashCode for your key class first.

On Tue, Dec 29, 2020 at 8:31 PM Shiao-An Yuan <shiao.an.y...@gmail.com>
wrote:

> Hi Sean,
>
> Sorry, I didn't describe it clearly. The column "pkey" is like a "Primary
> Key" and I do "reduce by key" on this column, so the "amount of rows"
> should always equal to the "cardinality of pkey".
> When I said data get duplicated & lost, I mean duplicated "pkey" exists in
> the output file (after "reduce by key") and some "pkey" missing.
> Since it only happens when executors being preempted, I believe this is a
> bug (nondeterministic shuffle) that SPARK-23207 trying to solve.
>
> Thanks,
>
> Shiao-An Yuan
>
> On Tue, Dec 29, 2020 at 10:53 PM Sean Owen <sro...@gmail.com> wrote:
>
>> Total guess here, but your key is a case class. It does define hashCode
>> and equals for you, but, you have an array as one of the members. Array
>> equality is by reference, so, two arrays of the same elements are not
>> equal. You may have to define hashCode and equals manually to make them
>> correct.
>>
>> On Tue, Dec 29, 2020 at 8:01 AM Shiao-An Yuan <shiao.an.y...@gmail.com>
>> wrote:
>>
>>> Hi folks,
>>>
>>> We recently identified a data correctness issue in our pipeline.
>>>
>>> The data processing flow is as follows:
>>> 1. read the current snapshot (provide empty if it doesn't exist yet)
>>> 2. read unprocessed new data
>>> 3. union them and do a `reduceByKey` operation
>>> 4. output a new version of the snapshot
>>> 5. repeat step 1~4
>>>
>>> The simplified version of code:
>>> ```
>>> // schema
>>> case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)
>>>
>>> // function for reduce
>>> def merge(left: Log, right: Log): Log = {
>>>   Log(pkey = left.pkey
>>>       a    = if (left.a!=null) left.a else right.a,
>>>       b    = if (left.a!=null) left.b else right.b,
>>>       ...
>>>   )
>>> }
>>>
>>> // a very large parquet file (>10G, 200 partitions)
>>> val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]
>>>
>>> // multiple small parquet files
>>> val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]
>>>
>>> val newSnapshot = currentSnapshot.union(newAddedLog)
>>>   .groupByKey(new String(pkey))                  // generate key
>>>   .reduceGroups(_.merge(_))                        //
>>> spark.sql.shuffle.partitions=200
>>>   .map(_._2)                                     // drop key
>>>
>>> newSnapshot
>>>   .repartition(60)                              // (1)
>>>   .write.parquet(newPath)
>>> ```
>>>
>>> The issue we have is that some data were duplicated or lost, and the
>>> amount of
>>> duplicated and loss data are similar.
>>>
>>> We also noticed that this situation only happens if some instances got
>>> preempted. Spark will retry the stage, so some of the partitioned files
>>> are
>>> generated at the 1st time, and other files are generated at the
>>> 2nd(retry) time.
>>> Moreover, those duplicated logs will be duplicated exactly twice and
>>> located in
>>> both batches (one in the first batch; and one in the second batch).
>>>
>>> The input/output files are parquet on GCS. The Spark version is 2.4.4
>>> with
>>> standalone deployment. Workers running on GCP preemptible instances and
>>> they
>>> being preempted very frequently.
>>>
>>> The pipeline is running in a single long-running process with
>>> multi-threads,
>>> each snapshot represent an "hour" of data, and we do the
>>> "read-reduce-write" operations
>>> on multiple snapshots(hours) simultaneously. We pretty sure the same
>>> snapshot(hour) never process parallelly and the output path always
>>> generated with a timestamp, so those jobs shouldn't affect each other.
>>>
>>> After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
>>> the issue
>>> was gone, but I believe there is still a correctness bug that hasn't
>>> been reported yet.
>>>
>>> We have tried to reproduce this bug on a smaller scale but haven't
>>> succeeded yet. I
>>> have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
>>> Since this case is DataSet, I believe it is unrelated to SPARK-24243.
>>>
>>> Can anyone give me some advice about the following tasks?
>>> Thanks in advance.
>>>
>>> Shiao-An Yuan
>>>
>>

Reply via email to