Hi Rob,

As a matter of interest, have you got an indication of a ballpark figure
for percentage of queries that end up with skewed distribution?

Thanks

Mich Talebzadeh,
Architect | Data Science | Financial Crime | Forensic Analysis | GDPR

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>





On Mon, 27 Jan 2025 at 17:24, Rob Reeves <robert.p.ree...@gmail.com> wrote:

> The counting does use count-min sketch and publishes the top K keys above
> a skew threshold to an accumulator. The core implementation in my prototype
> is in InlineApproxCountExec
> <https://github.com/robreeves/spark/pull/1/files#diff-3f324e7c94939ee254c49b3456a1685129fe6b01e2243343b666b8da6cc23b94R319>.
> The logical operator I added is InlineApproxCount
> <https://github.com/robreeves/spark/pull/1/files#diff-ae6713092c998474a5a8d6450aae3f06ad6c405ad608419a1aaa86fca2739311R27>.
> This can be used explicitly or analyzer rules can be used to inject it so
> the skew identification can be done during debugging without code changes.
>
> On Fri, Jan 24, 2025 at 3:05 PM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Ok so the catalyst optimizer will use this method of inline key counting
>> to provide spark optimizer with prior notification, so it identifies the
>> hot keys? What is this inline key counting based? Likely Count-Min Sketch
>> algorithm!
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Architect | Data Science | Financial Crime | Forensic Analysis | GDPR
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>>
>>
>> On Fri, 24 Jan 2025 at 21:17, Rob Reeves <robert.p.ree...@gmail.com>
>> wrote:
>>
>>> Hi Spark devs,
>>>
>>> I recently worked on a prototype to make it easier to identify the root
>>> cause of data skew in Spark. I wanted to see if the community was
>>> interested in it before working on contributing the changes (SPIP and PRs).
>>>
>>> *Problem*
>>> When a query has data skew today, you see outlier tasks taking a long
>>> time but that's all. The natural next question is what's causing the skew,
>>> but it takes lots of manual debugging (e.g. modifying queries,
>>> materializing intermediate datasets, etc).
>>>
>>> *Solution*
>>> I created a new inline operator to count and surface the top K most
>>> common values for an expression using accumulators. This narrows down data
>>> skew by telling a user which subquery has skew and what value in the data
>>> is causing it. I also include an analyzer rule to enable inline key
>>> counting for all joins. The motivation here is for someone to be able to
>>> debug data skew for a production job without requiring code changes or
>>> deploying a test job.
>>>
>>>  Here is an example of what it would look like in SHS for this example
>>> code.
>>>
>>> $ ./bin/spark-shell --conf spark.sql.debug.countJoinKeys=true
>>>
>>>
>>>
>>> val a = spark.range(10000)
>>>
>>>     .withColumn("key1", concat(lit("key_"), $"id" % 10))
>>>
>>> val b = spark.range(10000)
>>>
>>>     .withColumn("key2", concat(lit("key_"), when($"id" > 10,
>>>> 10).otherwise($"id")))
>>>
>>> val c = a.join(b, $"key1" === $"key2")
>>>
>>> c.count
>>>
>>>
>>> [image: image.png]
>>>
>>>
>>> Here is my prototype code
>>> <https://github.com/robreeves/spark/pull/1/files>. For counting, it
>>> uses the count-min sketch algorithm so it can be done in constant memory. I
>>> reused an existing count-min sketch implementation that already exists in
>>> Spark. Note, I know there are other performance optimizations that still
>>> need to be made before contributing it.
>>>
>>> This feature would be turned off by default since it will have a
>>> performance impact and to prevent personal identifiable information from
>>> being leaked to Spark History Server without the job owner's knowledge.
>>>
>>> *Extensions*
>>> Being able to identify the hot keys inline could be valuable for future
>>> optimizations, such as implementing Tree-Join (paper
>>> <https://arxiv.org/pdf/2209.08475>). Any extension beyond debugging has
>>> a prerequisite that inline counting performance overhead is not significant.
>>>
>>> Thanks for taking a look!
>>>
>>> - Rob
>>>
>>>

Reply via email to