I have ask this question on stack overflow, but it look to complex for Q/A 
resource.
https://stackoverflow.com/questions/68236323/spark-aqe-post-shuffle-partitions-coalesce-dont-work-as-expected-and-even-make
So I want ask for help here.

I use global sort on my spark DF, and when I enable AQE and post-shuffle 
coalesce, my partitions after sort operation become even worse distributed than 
before. 
```
    "spark.sql.adaptive.enabled" -> "true",
    "spark.sql.adaptive.coalescePartitions.enabled" -> "true",
    "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "256mb",
    "spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1",
    "spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "20000"
```
My query, on high level, looks:
```spark
.readFromKafka
.deserializeJsonToRow
.cache
.sort(const_part, column which can cause skew, some salt columns)
.writeToS3
```
 1. column which can cause skew -> yes my data is not well distributed, that's 
why I use salts.
 2. I read data from Kafka, so I use Kafka partition + offset columns as salt.
 3. Why Sort which uses reaprtitoinByRange under the hood doesn't help me and I 
want to enable AEQ? -> Right now I see that my Kafka message can have a too big 
a difference in size. So I see that my partitions after range repartition have 
near the same amount of records, but still very uneven in bytes.
 4. Why I think AQE must help me? -> I want to create many small ranges which 
even with my data skew will not be more than ~50mb, so post shuffle coalesce 
will be able to coalesce them to target size(256mb). In my case spikes, up to 
320mb are ok.

My first assumption was, that even with a small range has a too big a spike.
But I check and confirm that repartition by a range gives me good distribution 
in records, but bad is size. I have nearly 200 partitions with near the same 
amount of records and size differences of up to 9x times, from ~100Mb to ~900mb.
But with AEQ and repartition to 18000 small ranges, the smallest partition was 
18mib and the biggest 1.8Gib.
This state of things is much worse than without AEQ.
It's important to highlight that I use metrics from Spark UI -> Details for 
Stage tab to identify partitions size in bytes, and I have my own logs for 
records.


So I start to debug the issue, but AQE don't have enough logs on the input and 
output of 
`ShufflePartitionsUtil.coalescePartitions`.
That why I rewrite my query to repartitionByRange.sortWithingPartitoins. And 
[fork Physical Plan optimization with additional logging][1].
My logs show me, that my initial idea was right.
 * Input partitions after map  and write shuffle stage was split to be small 
enough
 * Coalesce algorithm collects them to goon number well distributed in bytes 
partition.
```
Input shuffleId:2 partitions:17999
Max partition size :27362117
Min partition size :8758435
```
And
```
Number of shuffle stages to coalesce 1
Reduce number of partitions from 17999 to 188
Output partition  maxsize :312832323
Output partition min size :103832323
```
Min size is so different, because of the size of the last partition, it's 
expected. TRACE log level shows that 99% of partitions is near 290mib.

 * But why spark UI show so different results? ->
 * May spark UI be wrong? ->
 * Maybe, but except for size, the duration of a task is also too big, which 
makes me think spark UI is ok.
 * So may assumption issue with `MapOutputStatistics` in my stage. But does it 
always broken or only in my case? ->
 *  Only in my case? -> I made a few checks to confirm it.
 * * I read the same dataset from s3(parquet files with block size 120mb)-> and 
AQE work as expected. post shuffle coalesce return to me 188, well distributed 
by size, partitions. it's important to notice that data on s3 not well 
distributed, but spark during reading split it to 259 near 120mb size 
partitions, most of all because of parquet block size 120mb.
 * * I read the same dataset from Kafka, but exclude the column with a skew 
from the partition function -> and AQE work as expected. post shuffle coalesce 
return to me 203, well distributed by size, partitions.
 * * I try to disable cache -> this does not have any result. I use the cache, 
only to avoid double reading from kafka. Because repartition by a range use 
sampling.
 * * I try to disable AQE and write 18000 partitions to s3 -> result was 
expected and the same as what my log on coalescing input show: 17999 files, 
smallest near 8mib and biggest 56mib.

 * All these checks make me think that `MapOutputStatistics` is wrong only for 
my case. May be an issue that how to relate to Kafka source or that my Kafka 
input data is very uneven distributed. 

Questions:
 * So do anyone have an idea what I do wrong? And What AQE do an partitions 
skew.
 * And what I can do with input data to make post shuffle coalesce work in my 
case?
 * If you think that I am right, please put a comment about it.

P.S. 
I also want to mention that my input Kafka data frame is 2160, not even 
distributed partitions -> some partitions can be 2x time bigger then others. 
Read from Kafka topic with 720 partitions and `minPartitions` option * 3.


  [1]: https://gist.github.com/GrigorievNick/2f77b26719e46c544e3f20aa488
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to