I haven’t read the code yet, but when you invoke spark-submit, where are you 
specifying --master yarn --deploy-mode client? Is it in the default config file 
and are you sure that spark-submit is reading the right file?

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Khaled Hammouda [mailto:khaled.hammo...@kik.com]
Sent: Thursday, June 16, 2016 11:45 AM
To: Mohammed Guller
Cc: user
Subject: Re: Spark SQL driver memory keeps rising

I'm using pyspark and running in YARN client mode. I managed to anonymize the 
code a bit and pasted it below.

You'll notice that I don't collect any output in the driver, instead the data 
is written to parquet directly. Also notice that I increased 
spark.driver.maxResultSize to 10g because the job was failing with the error 
"serialized results of x tasks (xxxx) is bigger than spark.driver.maxResultSize 
(xxx)", which means the tasks were sending something to the driver, and that's 
probably what's causing the driver memory usage to keep rising. This happens at 
stages that read/write shuffled data (as opposed to input stages).

I also noticed this message appear many times in the output "INFO: 
MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 
4 to ip-xx-xx-xx-xx.ec2.internal:49490". I'm not sure if it's relevant.

Cluster setup:
- AWS EMR 4.7.1 using Spark 1.6.1
- 200 nodes (r3.2xlarge, 61 GB memory), and the master node is r3.4xlarge (122 
GB memory)
- 1.4 TB json logs split across ~70k files in S3


# spark-submit --driver-memory 100g --executor-memory 44g --executor-cores 8 
--conf spark.yarn.executor.memoryOverhead=9000 --num-executors 199 
log_cleansing.py

conf = SparkConf()
conf.setAppName('log_cleansing') \
    .set("spark.driver.maxResultSize", "10g") \
    .set("spark.hadoop.spark.sql.parquet.output.committer.class", 
"org.apache.spark.sql.execution.datasources.parquet.DirectParquetOutputCommitter")
 \
    .set("spark.hadoop.mapred.output.committer.class", 
"org.apache.hadoop.mapred.DirectFileOutputCommitter") \
    .set("spark.hadoop.mapreduce.use.directfileoutputcommitter", "true")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "3600")

# raw data is stored in s3 and is split across 10s of thousands of files
# (each file is about 20MB, total data set size is around 1.4 TB)
# so we coalesce to a manageble number of partitions and store it in hdfs
text = sc.textFile("s3://json_logs/today/*/*").coalesce(400)
text.saveAsTextFile("hdfs:///json_logs/today")

# now read the json data set from hdfs
json_logs = sqlContext.read.json("hdfs:///json_logs/today")
sqlContext.registerDataFrameAsTable(json_logs, "json_logs")

# this cleans the date and only pulls in events within two days of their 
ingestion ts, elimiated pure duplicates
clean_date_and_gap = sqlContext.sql(
 '''SELECT DISTINCT t.*
    FROM json_logs t
    WHERE datediff(to_utc_timestamp(`ingestionTime`, 'UTC'), 
to_utc_timestamp(`timestamp`, 'UTC')) < 2
      AND to_utc_timestamp(`timestamp`, 'UTC') IS NOT NULL''')
sqlContext.registerDataFrameAsTable(clean_date_and_gap, "clean_date_and_gap")
clean_date_and_gap.cache()

# extract unique event instanceIds and their min ts
distinct_intanceIds = sqlContext.sql(
 '''SELECT
      instanceId,
      min(to_utc_timestamp(`ingestionTime`, 'UTC')) min_ingestion_ts
    FROM clean_date_and_gap
    GROUP BY instanceId''')
sqlContext.registerDataFrameAsTable(distinct_intanceIds, "distinct_intanceIds")

# create table with only the min ingestion ts records
deduped_day = sqlContext.sql(
 '''SELECT t.*
    FROM clean_date_and_gap t
    JOIN distinct_intanceIds d
      ON (d.instanceId = t.instanceId AND
          to_utc_timestamp(`ingestionTime`, 'UTC') = d.min_ingestion_ts)''')

# drop weird columns
deduped_day = deduped_day.drop('weird column name1') \
                         .drop('weird column name2') \
                         .drop('weird column name3')

# standardize all column names so that Parquet is happy
# rename() adds clean alias names to hide problematic column names
oldCols = deduped_day.schema.names
deduped_day_1 = deduped_day.select(*(rename(oldCols)))

sqlContext.registerDataFrameAsTable(deduped_day_1, 'deduped_day_1')
deduped_day_1.cache()

clean_date_and_gap.unpersist()

# load prior days isntanceids for deduping purpose
last1_instanceIds = 
sqlContext.read.parquet("s3://unique_instanceids/today_minus_1day/*")
last2_instanceIds = 
sqlContext.read.parquet("s3://unique_instanceids/today_minus_2day/*")
prior_instanceIds = 
last1_instanceIds.unionAll(last2_instanceIds).distinct().cache()
sqlContext.registerDataFrameAsTable(prior_instanceIds, 'prior_instanceIds')

# determine the overlap from prior days
overlap_from_prior_days = sqlContext.sql(
 '''SELECT DISTINCT
      d.Instanceid,
      case when y.Instanceid is not null then 1 else 0 end as seen_before
    FROM deduped_day_1 d
    LEFT JOIN prior_instanceIds y
      ON (y.Instanceid=d.Instanceid)''')
sqlContext.registerDataFrameAsTable(overlap_from_prior_days,'overlap_from_prior_days')

# save the final data out to a parquet file
final_cleaned_data = sqlContext.sql(
 '''SELECT d.*
    FROM deduped_day_1 d
    JOIN overlap_from_prior_days o
      ON (d.Instanceid = o.Instanceid)
    WHERE o.seen_before = 0''')

final_cleaned_data = final_cleaned_data.coalesce(200)
final_cleaned_data.write.parquet("s3://cleaned_data/today.parquet")


On Wed, Jun 15, 2016 at 10:23 PM, Mohammed Guller 
<moham...@glassbeam.com<mailto:moham...@glassbeam.com>> wrote:
It would be hard to guess what could be going on without looking at the code. 
It looks like the driver program goes into a long stop-the-world GC pause. This 
should not happen on the machine running the driver program if all that you are 
doing is reading data from HDFS, perform a bunch of transformations and write 
result back into HDFS.

Perhaps, the program is not actually using Spark in cluster mode, but running 
Spark in local mode?

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Khaled Hammouda 
[mailto:khaled.hammo...@kik.com<mailto:khaled.hammo...@kik.com>]
Sent: Tuesday, June 14, 2016 10:23 PM
To: user
Subject: Spark SQL driver memory keeps rising

I'm having trouble with a Spark SQL job in which I run a series of SQL 
transformations on data loaded from HDFS.

The first two stages load data from hdfs input without issues, but later stages 
that require shuffles cause the driver memory to keep rising until it is 
exhausted, and then the driver stalls, the spark UI stops responding, and the I 
can't even kill the driver with ^C, I have to forcibly kill the process.

I think I'm allocating enough memory to the driver: driver memory is 44 GB, and 
spark.driver.memoryOverhead is 4.5 GB. When I look at the memory usage, the 
driver memory before the shuffle starts is at about 2.4 GB (virtual mem size 
for the driver process is about 50 GB), and then once the stages that require 
shuffle start I can see the driver memory rising fast to about 47 GB, then 
everything stops responding.

I'm not invoking any output operation that collects data at the driver. I just 
call .cache() on a couple of dataframes since they get used more than once in 
the SQL transformations, but those should be cached on the workers. Then I 
write the final result to a parquet file, but the job doesn't get to this final 
stage.

What could possibly be causing the driver memory to rise that fast when no data 
is being collected at the driver?

Thanks,
Khaled

Reply via email to