Seen this before; had a very(!) complex plan behind a DataFrame, to the point 
where any additional transformation went OOM on the driver.

A quick and ugly solution was to break the plan - convert the DataFrame to rdd 
and back to DF at certain points to make the plan shorter. This has obvious 
drawbacks, and is not recommended in general, but at least we had something 
working. The real, long-term solution was to replace the many ( > 200)  
withColumn() calls to only a few select() calls. You can easily find sources on 
the internet for why this is better. (it was on Spark 2.3, but I think the main 
principles remain). TBH, it was easier than I expected, as it mainly involved 
moving pieces of code from one place to another, and not a "real", meaningful 
refactoring.


________________________________
From: Mich Talebzadeh <mich.talebza...@gmail.com>
Sent: Monday, May 27, 2024 15:43
Cc: user@spark.apache.org <user@spark.apache.org>
Subject: Re: Subject: [Spark SQL] [Debug] Spark Memory Issue with DataFrame 
Processing


This message contains hyperlinks, take precaution before opening these links.

Few ideas on top of my head for how to go about solving the problem


  1.  Try with subsets: Try reproducing the issue with smaller subsets of your 
data to pinpoint the specific operation causing the memory problems.
  2.  Explode or Flatten Nested Structures: If your DataFrame schema involves 
deep nesting, consider using techniques like explode or flattening to transform 
it into a less nested structure. This can reduce memory usage during operations 
like withColumn.
  3.  Lazy Evaluation: Use select before withColumn: this ensures lazy 
evaluation, meaning Spark only materializes the data when necessary. This can 
improve memory usage compared to directly calling withColumn on the entire 
DataFrame.
  4.  spark.sql.shuffle.partitions: Setting this configuration to a value close 
to the number of executors can improve shuffle performance and potentially 
reduce memory usage.
  5.  Spark UI Monitoring: Utilize the Spark UI to monitor memory usage 
throughout your job execution and identify potential memory bottlenecks.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but 
of course cannot be guaranteed . It is essential to note that, as with any 
advice, quote "one test result is worth one-thousand expert opinions (Werner 
<https://en.wikipedia.org/wiki/Wernher_von_Braun> Von 
Braun<https://en.wikipedia.org/wiki/Wernher_von_Braun>)".



On Mon, 27 May 2024 at 12:50, Gaurav Madan 
<gauravma...@urbancompany.com.invalid> wrote:
Dear Community,

I'm reaching out to seek your assistance with a memory issue we've been facing 
while processing certain large and nested DataFrames using Apache Spark. We 
have encountered a scenario where the driver runs out of memory when applying 
the `withColumn` method on specific DataFrames in Spark 3.4.1. However, the 
same DataFrames are processed successfully in Spark 2.4.0.

Problem Summary:
For certain DataFrames, applying the `withColumn` method in Spark 3.4.1 causes 
the driver to choke and run out of memory. However, the same DataFrames are 
processed successfully in Spark 2.4.0.

Heap Dump Analysis:
We performed a heap dump analysis after enabling heap dump on out-of-memory 
errors, and the analysis revealed the following significant frames and local 
variables:

```

org.apache.spark.sql.Dataset.withPlan(Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:4273)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.select(Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:1622)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.withColumns(Lscala/collection/Seq;Lscala/collection/Seq;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:2820)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

org.apache.spark.sql.Dataset.withColumn(Ljava/lang/String;Lorg/apache/spark/sql/Column;)Lorg/apache/spark/sql/Dataset;
 (Dataset.scala:2759)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.addStartTimeInPartitionColumn(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)Lorg/apache/spark/sql/Dataset;
 (DataPersistenceUtil.scala:88)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.utils.DataPersistenceUtil$.writeRawData(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Ljava/lang/String;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
 (DataPersistenceUtil.scala:19)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.step.bronze.BronzeStep$.persistRecords(Lorg/apache/spark/sql/Dataset;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;)V
 (BronzeStep.scala:23)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.MainJob$.processRecords(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/factory/output/Event;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lorg/apache/spark/sql/Dataset;)V
 (MainJob.scala:78)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

com.urbanclap.dp.eventpersistence.MainJob$.processBatchCB(Lorg/apache/spark/sql/SparkSession;Lcom/urbanclap/dp/eventpersistence/JobMetadata;Lcom/urbanclap/dp/factory/output/Event;Lorg/apache/spark/sql/Dataset;)V
 (MainJob.scala:66)

org.apache.spark.sql.Dataset @ 0x680190b10 retains 4,307,840,192 (80.38%) bytes

```

Driver Configuration:
1. Driver instance: c6g.xlarge with 4 vCPUs and 8 GB RAM.
2.  `spark.driver.memory` and `spark.driver.memoryOverhead` are set to default 
values.

Observations:
- The DataFrame schema is very nested and large, which might be contributing to 
the memory issue.
- Despite similar configurations, Spark 2.4.0 processes the DataFrame without 
issues, while Spark 3.4.1 does not.

Tried Solutions:
We have tried several solutions, including disabling Adaptive Query Execution, 
setting the driver max result size, increasing driver cores, and enabling 
specific optimizer rules. However, the issue persisted until we increased the 
driver memory to 48 GB and memory overhead to 5 GB, which allowed the driver to 
schedule the tasks successfully.

Request for Suggestions:
Are there any additional configurations or optimizations that could help 
mitigate this memory issue without always resorting to a larger machine? We 
would greatly appreciate any insights or recommendations from the community on 
how to resolve this issue effectively.

I have attached the DataFrame schema and the complete stack trace from the heap 
dump analysis for your reference.

Doc explaining the 
issue<https://docs.google.com/document/d/1FL6Zeim6IN1riLH1Hp7Jw4acsBoSyWbSN13mCYnjo60/edit?usp=sharing>
DataFrame 
Schema<https://drive.google.com/file/d/1wgFB0_WvdQdGoEMGFePhZwLR7aQZ5fPn/view?usp=sharing>

Thank you in advance for your assistance.

Best regards,
Gaurav Madan
LinkedIn<https://www.linkedin.com/in/gaurav-madan-210b62177/>
Personal Mail: gauravmadan...@gmail.com<mailto:gauravmadan...@gmail.com>
Work Mail: gauravma...@urbancompany.com<mailto:gauravma...@urbancompany.com>

Reply via email to