The partitions helped! I added repartition() and my function looks like this now:
feature_df = (alldat_idx .withColumn('label',alldat_idx['label_val'].cast('double')) .groupBy('id','label') .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is')) .repartition(1000) .withColumn('num_feat',lit(feature_vec_len)) .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat')) .drop('collect_list_is') .drop('num_feat')) I got a few failed containers for memory overflow, but the job was able to finish successfully. I tried upping the repartition as high as 4000 but a few still failed. For posterity's sake, where would I look for the footprint you have in mind? On the executor tab? Since the audience part of the task finished successfully and the failure was on a df that didn't touch it, it shouldn't've made a difference. Thank you! On Sat, Apr 15, 2017 at 9:07 PM, ayan guha <guha.a...@gmail.com> wrote: > What i missed is try increasing number of partitions using repartition > > On Sun, 16 Apr 2017 at 11:06 am, ayan guha <guha.a...@gmail.com> wrote: > >> It does not look like scala vs python thing. How big is your audience >> data store? Can it be broadcasted? >> >> What is the memory footprint you are seeing? At what point yarn is >> killing? Depeneding on that you may want to tweak around number of >> partitions of input dataset and increase number of executors >> >> Ayan >> >> >> On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <pmccar...@dstillery.com> >> wrote: >> >>> Hello, >>> >>> I'm trying to build an ETL job which takes in 30-100gb of text data and >>> prepares it for SparkML. I don't speak Scala so I've been trying to >>> implement in PySpark on YARN, Spark 2.1. >>> >>> Despite the transformations being fairly simple, the job always fails by >>> running out of executor memory. >>> >>> The input table is long (~6bn rows) but composed of three simple values: >>> >>> ##################################################################### >>> all_data_long.printSchema() >>> >>> root >>> |-- id: long (nullable = true) >>> |-- label: short (nullable = true) >>> |-- segment: string (nullable = true) >>> >>> ##################################################################### >>> >>> First I join it to a table of particular segments of interests and do an >>> aggregation, >>> >>> ##################################################################### >>> >>> audiences.printSchema() >>> >>> root >>> |-- entry: integer (nullable = true) >>> |-- descr: string (nullable = true) >>> >>> >>> print("Num in adl: {}".format(str(all_data_long.count()))) >>> >>> aud_str = audiences.select(audiences['entry'].cast('string'), >>> audiences['descr']) >>> >>> alldata_aud = all_data_long.join(aud_str, >>> all_data_long['segment']==aud_str['entry'], >>> 'left_outer') >>> >>> str_idx = StringIndexer(inputCol='segment',outputCol='indexedSegs') >>> >>> idx_df = str_idx.fit(alldata_aud) >>> label_df = idx_df.transform(alldata_aud).withColumnRenamed('label',' >>> label_val') >>> >>> id_seg = (label_df >>> .filter(label_df.descr.isNotNull()) >>> .groupBy('id') >>> .agg(collect_list('descr'))) >>> >>> id_seg.write.saveAsTable("hive.id_seg") >>> >>> ##################################################################### >>> >>> Then, I use that StringIndexer again on the first data frame to >>> featurize the segment ID >>> >>> ##################################################################### >>> >>> alldat_idx = idx_df.transform(all_data_long).withColumnRenamed(' >>> label','label_val') >>> >>> ##################################################################### >>> >>> >>> My ultimate goal is to make a SparseVector, so I group the indexed >>> segments by id and try to cast it into a vector >>> >>> ##################################################################### >>> >>> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0 >>> for v in l}),VectorUDT()) >>> >>> alldat_idx.cache() >>> >>> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1) >>> >>> print("alldat_dix: {}".format(str(alldat_idx.count()))) >>> >>> feature_df = (alldat_idx >>> .withColumn('label',alldat_idx['label_val'].cast('double')) >>> .groupBy('id','label') >>> .agg(sort_array(collect_list('indexedSegs')).alias('collect_ >>> list_is')) >>> .withColumn('num_feat',lit(feature_vec_len)) >>> .withColumn('features',list_to_sparse_udf('collect_list_ >>> is','num_feat')) >>> .drop('collect_list_is') >>> .drop('num_feat')) >>> >>> feature_df.cache() >>> print("Num in featuredf: {}".format(str(feature_df.count()))) ## <- >>> failure occurs here >>> >>> ##################################################################### >>> >>> Here, however, I always run out of memory on the executors (I've >>> twiddled driver and executor memory to check) and YARN kills off my >>> containers. I've gone as high as —executor-memory 15g but it still doesn't >>> help. >>> >>> Given the number of segments is at most 50,000 I'm surprised that a >>> smallish row-wise operation is enough to blow up the process. >>> >>> >>> Is it really the UDF that's killing me? Do I have to rewrite it in Scala? >>> >>> >>> >>> >>> >>> Query plans for the failing stage: >>> >>> ##################################################################### >>> >>> >>> == Parsed Logical Plan == >>> Aggregate [count(1) AS count#265L] >>> +- Project [id#0L, label#183, features#208] >>> +- Project [id#0L, label#183, num_feat#202, features#208] >>> +- Project [id#0L, label#183, collect_list_is#197, num_feat#202, >>> <lambda>(collect_list_is#197, num_feat#202) AS features#208] >>> +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS >>> num_feat#202] >>> +- Aggregate [id#0L, label#183], [id#0L, label#183, >>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS >>> collect_list_is#197] >>> +- Project [id#0L, label_val#99, segment#2, >>> indexedSegs#93, cast(label_val#99 as double) AS label#183] >>> +- Project [id#0L, label#1 AS label_val#99, segment#2, >>> indexedSegs#93] >>> +- Project [id#0L, label#1, segment#2, >>> UDF(cast(segment#2 as string)) AS indexedSegs#93] >>> +- MetastoreRelation pmccarthy, all_data_long >>> >>> == Analyzed Logical Plan == >>> count: bigint >>> Aggregate [count(1) AS count#265L] >>> +- Project [id#0L, label#183, features#208] >>> +- Project [id#0L, label#183, num_feat#202, features#208] >>> +- Project [id#0L, label#183, collect_list_is#197, num_feat#202, >>> <lambda>(collect_list_is#197, num_feat#202) AS features#208] >>> +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS >>> num_feat#202] >>> +- Aggregate [id#0L, label#183], [id#0L, label#183, >>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS >>> collect_list_is#197] >>> +- Project [id#0L, label_val#99, segment#2, >>> indexedSegs#93, cast(label_val#99 as double) AS label#183] >>> +- Project [id#0L, label#1 AS label_val#99, segment#2, >>> indexedSegs#93] >>> +- Project [id#0L, label#1, segment#2, >>> UDF(cast(segment#2 as string)) AS indexedSegs#93] >>> +- MetastoreRelation pmccarthy, all_data_long >>> >>> == Optimized Logical Plan == >>> Aggregate [count(1) AS count#265L] >>> +- Project >>> +- InMemoryRelation [id#0L, label#183, features#208], true, 10000, >>> StorageLevel(disk, memory, deserialized, 1 replicas) >>> +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208] >>> +- BatchEvalPython [<lambda>(collect_list_is#197, 56845.0)], >>> [id#0L, label#183, collect_list_is#197, pythonUDF0#244] >>> +- SortAggregate(key=[id#0L, label#183], >>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, >>> label#183, collect_list_is#197]) >>> +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS >>> FIRST], false, 0 >>> +- Exchange hashpartitioning(id#0L, label#183, 200) >>> +- *Project [id#0L, indexedSegs#93, >>> cast(label_val#99 as double) AS label#183] >>> +- InMemoryTableScan [id#0L, indexedSegs#93, >>> label_val#99] >>> +- InMemoryRelation [id#0L, >>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk, >>> memory, deserialized, 1 replicas) >>> +- *Project [id#0L, label#1 AS >>> label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93] >>> +- HiveTableScan [id#0L, >>> label#1, segment#2], MetastoreRelation pmccarthy, all_data_long >>> >>> == Physical Plan == >>> *HashAggregate(keys=[], functions=[count(1)], output=[count#265L]) >>> +- Exchange SinglePartition >>> +- *HashAggregate(keys=[], functions=[partial_count(1)], >>> output=[count#284L]) >>> +- InMemoryTableScan >>> +- InMemoryRelation [id#0L, label#183, features#208], true, >>> 10000, StorageLevel(disk, memory, deserialized, 1 replicas) >>> +- *Project [id#0L, label#183, pythonUDF0#244 AS >>> features#208] >>> +- BatchEvalPython [<lambda>(collect_list_is#197, >>> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244] >>> +- SortAggregate(key=[id#0L, label#183], >>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, >>> label#183, collect_list_is#197]) >>> +- *Sort [id#0L ASC NULLS FIRST, label#183 >>> ASC NULLS FIRST], false, 0 >>> +- Exchange hashpartitioning(id#0L, >>> label#183, 200) >>> +- *Project [id#0L, indexedSegs#93, >>> cast(label_val#99 as double) AS label#183] >>> +- InMemoryTableScan [id#0L, >>> indexedSegs#93, label_val#99] >>> +- InMemoryRelation [id#0L, >>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk, >>> memory, deserialized, 1 replicas) >>> +- *Project [id#0L, >>> label#1 AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93] >>> +- HiveTableScan >>> [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, all_data_long >>> >>> >>> -- >> Best Regards, >> Ayan Guha >> > -- > Best Regards, > Ayan Guha >