Hi folks,

We recently identified a data correctness issue in our pipeline.

The data processing flow is as follows:
1. read the current snapshot (provide empty if it doesn't exist yet)
2. read unprocessed new data
3. union them and do a `reduceByKey` operation
4. output a new version of the snapshot
5. repeat step 1~4

The simplified version of code:
```
// schema
case class Log(pkey: Array[Byte], a: String, b: Int, /* 100+ columns */)

// function for reduce
def merge(left: Log, right: Log): Log = {
  Log(pkey = left.pkey
      a    = if (left.a!=null) left.a else right.a,
      b    = if (left.a!=null) left.b else right.b,
      ...
  )
}

// a very large parquet file (>10G, 200 partitions)
val currentSnapshot = spark.read.schema(schema).parquet(...).as[Log]

// multiple small parquet files
val newAddedLogs = spark.read.schema(schema).parquet(...).as[Log]

val newSnapshot = currentSnapshot.union(newAddedLog)
  .groupByKey(new String(pkey))                  // generate key
  .reduceGroups(_.merge(_))                        //
spark.sql.shuffle.partitions=200
  .map(_._2)                                     // drop key

newSnapshot
  .repartition(60)                              // (1)
  .write.parquet(newPath)
```

The issue we have is that some data were duplicated or lost, and the amount
of
duplicated and loss data are similar.

We also noticed that this situation only happens if some instances got
preempted. Spark will retry the stage, so some of the partitioned files are
generated at the 1st time, and other files are generated at the 2nd(retry)
time.
Moreover, those duplicated logs will be duplicated exactly twice and
located in
both batches (one in the first batch; and one in the second batch).

The input/output files are parquet on GCS. The Spark version is 2.4.4 with
standalone deployment. Workers running on GCP preemptible instances and they
being preempted very frequently.

The pipeline is running in a single long-running process with multi-threads,
each snapshot represent an "hour" of data, and we do the
"read-reduce-write" operations
on multiple snapshots(hours) simultaneously. We pretty sure the same
snapshot(hour) never process parallelly and the output path always
generated with a timestamp, so those jobs shouldn't affect each other.

After changing the line (1) to `coalesce` or `repartition(100, $"pkey")`
the issue
was gone, but I believe there is still a correctness bug that hasn't been
reported yet.

We have tried to reproduce this bug on a smaller scale but haven't
succeeded yet. I
have read SPARK-23207 and SPARK-28699, but couldn't found the bug.
Since this case is DataSet, I believe it is unrelated to SPARK-24243.

Can anyone give me some advice about the following tasks?
Thanks in advance.

Shiao-An Yuan

Reply via email to