[ https://issues.apache.org/jira/browse/SPARK-47520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17914587#comment-17914587 ]
William Montaz commented on SPARK-47520: ---------------------------------------- Hello, I see no activity regarding this bug report, however we are 100% sure that there is an issue with UnsafeRow and repartition right after a 'group by' involving a sum of float in the case of fetch failure exception, because the ordering of rows matter for float additions. This order is changed when retry is performed, leading to the UnsafeRow being different (one bit shift), but in the end the Sort function of the UnsafeRow before the repartition assigns it to a different partition. Since we only retried few partitions, that leads this Row to belong to 2 partitions, the one that succeeded and a new one retried. This same problem also leads some row to disappear because they are no more belonging to the retried partition. I can share more detail if required, maybe my explanations are not clear enough, but I'd like to some indication that this ticket has been seen already. Thanks > Precision issues with sum of floats/doubles leads to incorrect data after > repartition stage retry > ------------------------------------------------------------------------------------------------- > > Key: SPARK-47520 > URL: https://issues.apache.org/jira/browse/SPARK-47520 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.4.2, 3.3.2, 3.5.0 > Reporter: William Montaz > Priority: Major > Labels: correctness > > We discovered an important correctness issue directly linked to SPARK-47024 > Even if SPARK-47024 has been considered 'Not a Problem' since it is linked > directly to floats and double precision, it can still have drastic impacts > combined to spark.sql.execution.sortBeforeRepartition set to true (the > default) > We consistently reproduced the issue doing a GROUP BY with a SUM of float or > double aggregation, followed by a repartition (common case to produce bigger > files as output, either triggered by SQL hints or extensions like kyuubi). > If the repartition stage fails with Fetch Failed Exception for only few > tasks, spark decides to recompute the partitions from the previous stage for > which output could not be fetched and will retry only the failed partitions > downstream. > Because block fetch order is indeterministic, the new upstream partition > computation can provide a slightly different value for a float/double sum > aggregation. We noticed a 1 bit difference is UnsafeRow backing byte array in > all of our attempts. The sort performed before repartition uses > UnsafeRow.hashcode for the row prefix which will be completely different even > with such 1 bit difference, leading to the sort being completely different in > the new upstream partition and thus target downstream partition for the > shuffled rows completely different as well. > Because sort becomes undeterministic and since only the failed dowstream > tasks are retried the resulting repartition will lead to duplicate rows as > well as missing rows. The solution brought by SPARK-23207 is broken. > So far, we can only deactivate spark.sql.execution.sortBeforeRepartition to > make the entire job fail instead of producing incorrect data. The default for > spark currently leads to silent correctness issue. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org