Jesuino created SPARK-51533:
-------------------------------

             Summary: Performance Degradation with mapInPandas in Spark 3.5.*
                 Key: SPARK-51533
                 URL: https://issues.apache.org/jira/browse/SPARK-51533
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.5.5, 3.5.4, 3.5.3, 3.5.2, 3.5.1, 3.5.0
            Reporter: Jesuino


After upgrading to Spark 3.5.*, I noticed a significant performance degradation 
when using `mapInPandas` for computationally intensive tasks, in this case 
computing SHAP values in parallel. Performance remained consistent across Spark 
versions from 3.1 to 3.4. However, after upgrading to Spark 3.5, execution time 
has increased substantially.

h1. Minimal Reproducible Example

I've created a [minimal reproducible 
example|https://gist.github.com/jesuinovieira/610b28c99b00c108a170c7a276943d3b] 
to isolate the issue as much as I could. Below are the execution times per SHAP 
iteration using this code:
 
||Model||Size (MB)||Spark 3.4.4 (s/it)||Spark 3.5.0 (s/it)||
|lgb-s|20|1|5|
|lgb-m|52|2.5|13|
|lgb-l|110|5|40|

As shown, execution time has increased by approximately 5-8x after upgrading to 
Spark 3.5.

{code:python}
import time
import os
import sys
import findspark
import pandas as pd
import shap
import lightgbm as lgb
import requests
from typing import Iterable
from sklearn.model_selection import train_test_split

findspark.init()
os.environ["PYSPARK_PYTHON"] = sys.executable

import pyspark.sql
import pyspark.sql.types as T


def explain(df, model, background_data):
    def compute_shap(iterable: Iterable[pd.DataFrame]) -> 
Iterable[pd.DataFrame]:
        for i, batch in enumerate(iterable):
            if i > 0:
                break

            explainer.shap_values(batch, silent=False)
            yield pd.DataFrame(columns=["dummy"])

    explainer = shap.KernelExplainer(
        model=model.predict,
        data=background_data,
        keep_index=True,
        link="identity",
    )

    print("Computing shap values")
    t1 = time.time()

    schema = T.StructType([T.StructField("dummy", T.IntegerType())])
    shap_values = df.mapInPandas(compute_shap, schema=schema)
    shap_values.collect()

    t2 = time.time()
    print(f"Elapsed time: {round(t2 - t1, 2)} seconds")


conf = pyspark.SparkConf().setAppName("bug")
# Set maxRecordsPerBatch to 1 since we are interested in a single iteration
conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1")
spark = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate()

# NOTE: set size to train lgb model with different number of estimators
# s: n_estimator=1000, m: n_estimators=2500, l: n_estimators=5000
size = "s"

# Download the dataset if it doesn't exist
url = 
"https://raw.githubusercontent.com/saul-chirinos/Electricity-demand-forecasting-in-Panama/master/Data/continuous%20dataset.csv";
filename = "panama.csv"

if not os.path.isfile(filename):
    response = requests.get(url)
    response.raise_for_status()
    with open(filename, "wb") as file:
        file.write(response.content)

# Load data
data = pd.read_csv(filename).drop(columns=["datetime", "QV2M_san", "T2M_san", 
"T2M_toc"])
X, y = data.drop(columns=["nat_demand"]), data["nat_demand"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, 
random_state=42)

# Train model
params = {"n_estimators": 1000 if size == "s" else 2500 if size == "m" else 
5000, "num_leaves": 256}
train, test = lgb.Dataset(X_train, label=y_train), lgb.Dataset(X_test, 
label=y_test)
predictor = lgb.train(params=params, train_set=train, valid_sets=[test])
predictor.save_model(f"lgb-{size}.txt")

# NOTE: use this for multiple runs to avoid retraining
# 
# Load model
# predictor = lgb.Booster(model_file=f"lgb-{size}.txt")

print(f"lgb-{size}: {os.path.getsize(f'lgb-{size}.txt') / (1024 * 1024):.2f} 
MB")

# Select samples for background data and to be explained
background_data = X_train.iloc[:10]
df = spark.createDataFrame(X_test.iloc[:100]).coalesce(1)

print(f"{pyspark.__version__=}")
explain(df, predictor, background_data)
{code}

h1. What I Tried

 * Reviewed Spark 3.5 release notes and reverted relevant configuration changes 
— no impact
 * Checked logical/physical plans - no major differences
 * Analyzed execution with sparkmeasure — no notable differences
 * Tested with all versions from 3.5.0 to 3.5.5 - the issue persists in every 
release

h1. Questions

 * Has anyone else experienced similar slowdowns in Spark 3.5.* with 
mapInPandas?
 * Could this be related to changes in serialization, Arrow, or Pandas UDF 
internals?
 * Any suggestions on how to further diagnose or work around this issue?

Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to