Jesuino created SPARK-51533: ------------------------------- Summary: Performance Degradation with mapInPandas in Spark 3.5.* Key: SPARK-51533 URL: https://issues.apache.org/jira/browse/SPARK-51533 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.5.5, 3.5.4, 3.5.3, 3.5.2, 3.5.1, 3.5.0 Reporter: Jesuino
After upgrading to Spark 3.5.*, I noticed a significant performance degradation when using `mapInPandas` for computationally intensive tasks, in this case computing SHAP values in parallel. Performance remained consistent across Spark versions from 3.1 to 3.4. However, after upgrading to Spark 3.5, execution time has increased substantially. h1. Minimal Reproducible Example I've created a [minimal reproducible example|https://gist.github.com/jesuinovieira/610b28c99b00c108a170c7a276943d3b] to isolate the issue as much as I could. Below are the execution times per SHAP iteration using this code: ||Model||Size (MB)||Spark 3.4.4 (s/it)||Spark 3.5.0 (s/it)|| |lgb-s|20|1|5| |lgb-m|52|2.5|13| |lgb-l|110|5|40| As shown, execution time has increased by approximately 5-8x after upgrading to Spark 3.5. {code:python} import time import os import sys import findspark import pandas as pd import shap import lightgbm as lgb import requests from typing import Iterable from sklearn.model_selection import train_test_split findspark.init() os.environ["PYSPARK_PYTHON"] = sys.executable import pyspark.sql import pyspark.sql.types as T def explain(df, model, background_data): def compute_shap(iterable: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]: for i, batch in enumerate(iterable): if i > 0: break explainer.shap_values(batch, silent=False) yield pd.DataFrame(columns=["dummy"]) explainer = shap.KernelExplainer( model=model.predict, data=background_data, keep_index=True, link="identity", ) print("Computing shap values") t1 = time.time() schema = T.StructType([T.StructField("dummy", T.IntegerType())]) shap_values = df.mapInPandas(compute_shap, schema=schema) shap_values.collect() t2 = time.time() print(f"Elapsed time: {round(t2 - t1, 2)} seconds") conf = pyspark.SparkConf().setAppName("bug") # Set maxRecordsPerBatch to 1 since we are interested in a single iteration conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1") spark = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate() # NOTE: set size to train lgb model with different number of estimators # s: n_estimator=1000, m: n_estimators=2500, l: n_estimators=5000 size = "s" # Download the dataset if it doesn't exist url = "https://raw.githubusercontent.com/saul-chirinos/Electricity-demand-forecasting-in-Panama/master/Data/continuous%20dataset.csv" filename = "panama.csv" if not os.path.isfile(filename): response = requests.get(url) response.raise_for_status() with open(filename, "wb") as file: file.write(response.content) # Load data data = pd.read_csv(filename).drop(columns=["datetime", "QV2M_san", "T2M_san", "T2M_toc"]) X, y = data.drop(columns=["nat_demand"]), data["nat_demand"] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Train model params = {"n_estimators": 1000 if size == "s" else 2500 if size == "m" else 5000, "num_leaves": 256} train, test = lgb.Dataset(X_train, label=y_train), lgb.Dataset(X_test, label=y_test) predictor = lgb.train(params=params, train_set=train, valid_sets=[test]) predictor.save_model(f"lgb-{size}.txt") # NOTE: use this for multiple runs to avoid retraining # # Load model # predictor = lgb.Booster(model_file=f"lgb-{size}.txt") print(f"lgb-{size}: {os.path.getsize(f'lgb-{size}.txt') / (1024 * 1024):.2f} MB") # Select samples for background data and to be explained background_data = X_train.iloc[:10] df = spark.createDataFrame(X_test.iloc[:100]).coalesce(1) print(f"{pyspark.__version__=}") explain(df, predictor, background_data) {code} h1. What I Tried * Reviewed Spark 3.5 release notes and reverted relevant configuration changes — no impact * Checked logical/physical plans - no major differences * Analyzed execution with sparkmeasure — no notable differences * Tested with all versions from 3.5.0 to 3.5.5 - the issue persists in every release h1. Questions * Has anyone else experienced similar slowdowns in Spark 3.5.* with mapInPandas? * Could this be related to changes in serialization, Arrow, or Pandas UDF internals? * Any suggestions on how to further diagnose or work around this issue? Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org