I'm using pyspark and running in YARN client mode. I managed to anonymize
the code a bit and pasted it below.

You'll notice that I don't collect any output in the driver, instead the
data is written to parquet directly. Also notice that I increased
spark.driver.maxResultSize to 10g because the job was failing with the
error "serialized results of x tasks (xxxx) is bigger than
spark.driver.maxResultSize (xxx)", which means the tasks were sending
something to the driver, and that's probably what's causing the driver
memory usage to keep rising. This happens at stages that read/write
shuffled data (as opposed to input stages).

I also noticed this message appear many times in the output "INFO:
MapOutputTrackerMasterEndpoint: Asked to send map output locations for
shuffle 4 to ip-xx-xx-xx-xx.ec2.internal:49490". I'm not sure if it's
relevant.

Cluster setup:
- AWS EMR 4.7.1 using Spark 1.6.1
- 200 nodes (r3.2xlarge, 61 GB memory), and the master node is r3.4xlarge
(122 GB memory)
- 1.4 TB json logs split across ~70k files in S3


# spark-submit --driver-memory 100g --executor-memory 44g --executor-cores
8 --conf spark.yarn.executor.memoryOverhead=9000 --num-executors 199
log_cleansing.py

conf = SparkConf()
conf.setAppName('log_cleansing') \
    .set("spark.driver.maxResultSize", "10g") \
    .set("spark.hadoop.spark.sql.parquet.output.committer.class",
"org.apache.spark.sql.execution.datasources.parquet.DirectParquetOutputCommitter")
\
    .set("spark.hadoop.mapred.output.committer.class",
"org.apache.hadoop.mapred.DirectFileOutputCommitter") \
    .set("spark.hadoop.mapreduce.use.directfileoutputcommitter", "true")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "3600")

# raw data is stored in s3 and is split across 10s of thousands of files
# (each file is about 20MB, total data set size is around 1.4 TB)
# so we coalesce to a manageble number of partitions and store it in hdfs
text = sc.textFile("s3://json_logs/today/*/*").coalesce(400)
text.saveAsTextFile("hdfs:///json_logs/today")

# now read the json data set from hdfs
json_logs = sqlContext.read.json("hdfs:///json_logs/today")
sqlContext.registerDataFrameAsTable(json_logs, "json_logs")

# this cleans the date and only pulls in events within two days of their
ingestion ts, elimiated pure duplicates
clean_date_and_gap = sqlContext.sql(
 '''SELECT DISTINCT t.*
    FROM json_logs t
    WHERE datediff(to_utc_timestamp(`ingestionTime`, 'UTC'),
to_utc_timestamp(`timestamp`, 'UTC')) < 2
      AND to_utc_timestamp(`timestamp`, 'UTC') IS NOT NULL''')
sqlContext.registerDataFrameAsTable(clean_date_and_gap,
"clean_date_and_gap")
clean_date_and_gap.cache()

# extract unique event instanceIds and their min ts
distinct_intanceIds = sqlContext.sql(
 '''SELECT
      instanceId,
      min(to_utc_timestamp(`ingestionTime`, 'UTC')) min_ingestion_ts
    FROM clean_date_and_gap
    GROUP BY instanceId''')
sqlContext.registerDataFrameAsTable(distinct_intanceIds,
"distinct_intanceIds")

# create table with only the min ingestion ts records
deduped_day = sqlContext.sql(
 '''SELECT t.*
    FROM clean_date_and_gap t
    JOIN distinct_intanceIds d
      ON (d.instanceId = t.instanceId AND
          to_utc_timestamp(`ingestionTime`, 'UTC') = d.min_ingestion_ts)''')

# drop weird columns
deduped_day = deduped_day.drop('weird column name1') \
                         .drop('weird column name2') \
                         .drop('weird column name3')

# standardize all column names so that Parquet is happy
# rename() adds clean alias names to hide problematic column names
oldCols = deduped_day.schema.names
deduped_day_1 = deduped_day.select(*(rename(oldCols)))

sqlContext.registerDataFrameAsTable(deduped_day_1, 'deduped_day_1')
deduped_day_1.cache()

clean_date_and_gap.unpersist()

# load prior days isntanceids for deduping purpose
last1_instanceIds =
sqlContext.read.parquet("s3://unique_instanceids/today_minus_1day/*")
last2_instanceIds =
sqlContext.read.parquet("s3://unique_instanceids/today_minus_2day/*")
prior_instanceIds =
last1_instanceIds.unionAll(last2_instanceIds).distinct().cache()
sqlContext.registerDataFrameAsTable(prior_instanceIds, 'prior_instanceIds')

# determine the overlap from prior days
overlap_from_prior_days = sqlContext.sql(
 '''SELECT DISTINCT
      d.Instanceid,
      case when y.Instanceid is not null then 1 else 0 end as seen_before
    FROM deduped_day_1 d
    LEFT JOIN prior_instanceIds y
      ON (y.Instanceid=d.Instanceid)''')
sqlContext.registerDataFrameAsTable(overlap_from_prior_days,'overlap_from_prior_days')

# save the final data out to a parquet file
final_cleaned_data = sqlContext.sql(
 '''SELECT d.*
    FROM deduped_day_1 d
    JOIN overlap_from_prior_days o
      ON (d.Instanceid = o.Instanceid)
    WHERE o.seen_before = 0''')

final_cleaned_data = final_cleaned_data.coalesce(200)
final_cleaned_data.write.parquet("s3://cleaned_data/today.parquet")


On Wed, Jun 15, 2016 at 10:23 PM, Mohammed Guller <moham...@glassbeam.com>
wrote:

> It would be hard to guess what could be going on without looking at the
> code. It looks like the driver program goes into a long stop-the-world GC
> pause. This should not happen on the machine running the driver program if
> all that you are doing is reading data from HDFS, perform a bunch of
> transformations and write result back into HDFS.
>
>
>
> Perhaps, the program is not actually using Spark in cluster mode, but
> running Spark in local mode?
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>
>
>
> *From:* Khaled Hammouda [mailto:khaled.hammo...@kik.com]
> *Sent:* Tuesday, June 14, 2016 10:23 PM
> *To:* user
> *Subject:* Spark SQL driver memory keeps rising
>
>
>
> I'm having trouble with a Spark SQL job in which I run a series of SQL
> transformations on data loaded from HDFS.
>
>
>
> The first two stages load data from hdfs input without issues, but later
> stages that require shuffles cause the driver memory to keep rising until
> it is exhausted, and then the driver stalls, the spark UI stops responding,
> and the I can't even kill the driver with ^C, I have to forcibly kill the
> process.
>
>
>
> I think I'm allocating enough memory to the driver: driver memory is 44
> GB, and spark.driver.memoryOverhead is 4.5 GB. When I look at the memory
> usage, the driver memory before the shuffle starts is at about 2.4 GB
> (virtual mem size for the driver process is about 50 GB), and then once the
> stages that require shuffle start I can see the driver memory rising fast
> to about 47 GB, then everything stops responding.
>
>
>
> I'm not invoking any output operation that collects data at the driver. I
> just call .cache() on a couple of dataframes since they get used more than
> once in the SQL transformations, but those should be cached on the workers.
> Then I write the final result to a parquet file, but the job doesn't get to
> this final stage.
>
>
>
> What could possibly be causing the driver memory to rise that fast when no
> data is being collected at the driver?
>
>
>
> Thanks,
>
> Khaled
>

Reply via email to