What i missed is try increasing number of partitions using repartition On Sun, 16 Apr 2017 at 11:06 am, ayan guha <guha.a...@gmail.com> wrote:
> It does not look like scala vs python thing. How big is your audience data > store? Can it be broadcasted? > > What is the memory footprint you are seeing? At what point yarn is > killing? Depeneding on that you may want to tweak around number of > partitions of input dataset and increase number of executors > > Ayan > > > On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <pmccar...@dstillery.com> > wrote: > >> Hello, >> >> I'm trying to build an ETL job which takes in 30-100gb of text data and >> prepares it for SparkML. I don't speak Scala so I've been trying to >> implement in PySpark on YARN, Spark 2.1. >> >> Despite the transformations being fairly simple, the job always fails by >> running out of executor memory. >> >> The input table is long (~6bn rows) but composed of three simple values: >> >> ##################################################################### >> all_data_long.printSchema() >> >> root >> |-- id: long (nullable = true) >> |-- label: short (nullable = true) >> |-- segment: string (nullable = true) >> >> ##################################################################### >> >> First I join it to a table of particular segments of interests and do an >> aggregation, >> >> ##################################################################### >> >> audiences.printSchema() >> >> root >> |-- entry: integer (nullable = true) >> |-- descr: string (nullable = true) >> >> >> print("Num in adl: {}".format(str(all_data_long.count()))) >> >> aud_str = audiences.select(audiences['entry'].cast('string'), >> audiences['descr']) >> >> alldata_aud = all_data_long.join(aud_str, >> all_data_long['segment']==aud_str['entry'], >> 'left_outer') >> >> str_idx = StringIndexer(inputCol='segment',outputCol='indexedSegs') >> >> idx_df = str_idx.fit(alldata_aud) >> label_df = >> idx_df.transform(alldata_aud).withColumnRenamed('label','label_val') >> >> id_seg = (label_df >> .filter(label_df.descr.isNotNull()) >> .groupBy('id') >> .agg(collect_list('descr'))) >> >> id_seg.write.saveAsTable("hive.id_seg") >> >> ##################################################################### >> >> Then, I use that StringIndexer again on the first data frame to featurize >> the segment ID >> >> ##################################################################### >> >> alldat_idx = >> idx_df.transform(all_data_long).withColumnRenamed('label','label_val') >> >> ##################################################################### >> >> >> My ultimate goal is to make a SparseVector, so I group the indexed >> segments by id and try to cast it into a vector >> >> ##################################################################### >> >> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0 >> for v in l}),VectorUDT()) >> >> alldat_idx.cache() >> >> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1) >> >> print("alldat_dix: {}".format(str(alldat_idx.count()))) >> >> feature_df = (alldat_idx >> .withColumn('label',alldat_idx['label_val'].cast('double')) >> .groupBy('id','label') >> >> .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is')) >> .withColumn('num_feat',lit(feature_vec_len)) >> >> .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat')) >> .drop('collect_list_is') >> .drop('num_feat')) >> >> feature_df.cache() >> print("Num in featuredf: {}".format(str(feature_df.count()))) ## <- >> failure occurs here >> >> ##################################################################### >> >> Here, however, I always run out of memory on the executors (I've twiddled >> driver and executor memory to check) and YARN kills off my containers. I've >> gone as high as —executor-memory 15g but it still doesn't help. >> >> Given the number of segments is at most 50,000 I'm surprised that a >> smallish row-wise operation is enough to blow up the process. >> >> >> Is it really the UDF that's killing me? Do I have to rewrite it in Scala? >> >> >> >> >> >> Query plans for the failing stage: >> >> ##################################################################### >> >> >> == Parsed Logical Plan == >> Aggregate [count(1) AS count#265L] >> +- Project [id#0L, label#183, features#208] >> +- Project [id#0L, label#183, num_feat#202, features#208] >> +- Project [id#0L, label#183, collect_list_is#197, num_feat#202, >> <lambda>(collect_list_is#197, num_feat#202) AS features#208] >> +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS >> num_feat#202] >> +- Aggregate [id#0L, label#183], [id#0L, label#183, >> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197] >> +- Project [id#0L, label_val#99, segment#2, >> indexedSegs#93, cast(label_val#99 as double) AS label#183] >> +- Project [id#0L, label#1 AS label_val#99, segment#2, >> indexedSegs#93] >> +- Project [id#0L, label#1, segment#2, >> UDF(cast(segment#2 as string)) AS indexedSegs#93] >> +- MetastoreRelation pmccarthy, all_data_long >> >> == Analyzed Logical Plan == >> count: bigint >> Aggregate [count(1) AS count#265L] >> +- Project [id#0L, label#183, features#208] >> +- Project [id#0L, label#183, num_feat#202, features#208] >> +- Project [id#0L, label#183, collect_list_is#197, num_feat#202, >> <lambda>(collect_list_is#197, num_feat#202) AS features#208] >> +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS >> num_feat#202] >> +- Aggregate [id#0L, label#183], [id#0L, label#183, >> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197] >> +- Project [id#0L, label_val#99, segment#2, >> indexedSegs#93, cast(label_val#99 as double) AS label#183] >> +- Project [id#0L, label#1 AS label_val#99, segment#2, >> indexedSegs#93] >> +- Project [id#0L, label#1, segment#2, >> UDF(cast(segment#2 as string)) AS indexedSegs#93] >> +- MetastoreRelation pmccarthy, all_data_long >> >> == Optimized Logical Plan == >> Aggregate [count(1) AS count#265L] >> +- Project >> +- InMemoryRelation [id#0L, label#183, features#208], true, 10000, >> StorageLevel(disk, memory, deserialized, 1 replicas) >> +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208] >> +- BatchEvalPython [<lambda>(collect_list_is#197, 56845.0)], >> [id#0L, label#183, collect_list_is#197, pythonUDF0#244] >> +- SortAggregate(key=[id#0L, label#183], >> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183, >> collect_list_is#197]) >> +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS >> FIRST], false, 0 >> +- Exchange hashpartitioning(id#0L, label#183, 200) >> +- *Project [id#0L, indexedSegs#93, >> cast(label_val#99 as double) AS label#183] >> +- InMemoryTableScan [id#0L, indexedSegs#93, >> label_val#99] >> +- InMemoryRelation [id#0L, >> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk, >> memory, deserialized, 1 replicas) >> +- *Project [id#0L, label#1 AS >> label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93] >> +- HiveTableScan [id#0L, >> label#1, segment#2], MetastoreRelation pmccarthy, all_data_long >> >> == Physical Plan == >> *HashAggregate(keys=[], functions=[count(1)], output=[count#265L]) >> +- Exchange SinglePartition >> +- *HashAggregate(keys=[], functions=[partial_count(1)], >> output=[count#284L]) >> +- InMemoryTableScan >> +- InMemoryRelation [id#0L, label#183, features#208], true, >> 10000, StorageLevel(disk, memory, deserialized, 1 replicas) >> +- *Project [id#0L, label#183, pythonUDF0#244 AS >> features#208] >> +- BatchEvalPython [<lambda>(collect_list_is#197, >> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244] >> +- SortAggregate(key=[id#0L, label#183], >> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183, >> collect_list_is#197]) >> +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC >> NULLS FIRST], false, 0 >> +- Exchange hashpartitioning(id#0L, >> label#183, 200) >> +- *Project [id#0L, indexedSegs#93, >> cast(label_val#99 as double) AS label#183] >> +- InMemoryTableScan [id#0L, >> indexedSegs#93, label_val#99] >> +- InMemoryRelation [id#0L, >> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk, >> memory, deserialized, 1 replicas) >> +- *Project [id#0L, >> label#1 AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93] >> +- HiveTableScan >> [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, all_data_long >> >> >> -- > Best Regards, > Ayan Guha > -- Best Regards, Ayan Guha