What i missed is try increasing number of partitions using repartition
On Sun, 16 Apr 2017 at 11:06 am, ayan guha <guha.a...@gmail.com> wrote:

> It does not look like scala vs python thing. How big is your audience data
> store? Can it be broadcasted?
>
> What is the memory footprint you are seeing? At what point yarn is
> killing? Depeneding on that you may want to tweak around number of
> partitions of input dataset and increase number of executors
>
> Ayan
>
>
> On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <pmccar...@dstillery.com>
> wrote:
>
>> Hello,
>>
>> I'm trying to build an ETL job which takes in 30-100gb of text data and
>> prepares it for SparkML. I don't speak Scala so I've been trying to
>> implement in PySpark on YARN, Spark 2.1.
>>
>> Despite the transformations being fairly simple, the job always fails by
>> running out of executor memory.
>>
>> The input table is long (~6bn rows) but composed of three simple values:
>>
>> #####################################################################
>> all_data_long.printSchema()
>>
>> root
>> |-- id: long (nullable = true)
>> |-- label: short (nullable = true)
>> |-- segment: string (nullable = true)
>>
>> #####################################################################
>>
>> First I join it to a table of particular segments of interests and do an
>> aggregation,
>>
>> #####################################################################
>>
>> audiences.printSchema()
>>
>> root
>>  |-- entry: integer (nullable = true)
>>  |-- descr: string (nullable = true)
>>
>>
>> print("Num in adl: {}".format(str(all_data_long.count())))
>>
>> aud_str = audiences.select(audiences['entry'].cast('string'),
>>         audiences['descr'])
>>
>> alldata_aud = all_data_long.join(aud_str,
>>         all_data_long['segment']==aud_str['entry'],
>>         'left_outer')
>>
>> str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')
>>
>> idx_df   = str_idx.fit(alldata_aud)
>> label_df =
>> idx_df.transform(alldata_aud).withColumnRenamed('label','label_val')
>>
>> id_seg = (label_df
>>         .filter(label_df.descr.isNotNull())
>>         .groupBy('id')
>>         .agg(collect_list('descr')))
>>
>> id_seg.write.saveAsTable("hive.id_seg")
>>
>> #####################################################################
>>
>> Then, I use that StringIndexer again on the first data frame to featurize
>> the segment ID
>>
>> #####################################################################
>>
>> alldat_idx =
>> idx_df.transform(all_data_long).withColumnRenamed('label','label_val')
>>
>> #####################################################################
>>
>>
>> My ultimate goal is to make a SparseVector, so I group the indexed
>> segments by id and try to cast it into a vector
>>
>> #####################################################################
>>
>> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0
>> for v in l}),VectorUDT())
>>
>> alldat_idx.cache()
>>
>> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)
>>
>> print("alldat_dix: {}".format(str(alldat_idx.count())))
>>
>> feature_df = (alldat_idx
>>         .withColumn('label',alldat_idx['label_val'].cast('double'))
>>         .groupBy('id','label')
>>
>> .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
>>         .withColumn('num_feat',lit(feature_vec_len))
>>
>> .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
>>         .drop('collect_list_is')
>>         .drop('num_feat'))
>>
>> feature_df.cache()
>> print("Num in featuredf: {}".format(str(feature_df.count())))  ## <-
>> failure occurs here
>>
>> #####################################################################
>>
>> Here, however, I always run out of memory on the executors (I've twiddled
>> driver and executor memory to check) and YARN kills off my containers. I've
>> gone as high as —executor-memory 15g but it still doesn't help.
>>
>> Given the number of segments is at most 50,000 I'm surprised that a
>> smallish row-wise operation is enough to blow up the process.
>>
>>
>> Is it really the UDF that's killing me? Do I have to rewrite it in Scala?
>>
>>
>>
>>
>>
>> Query plans for the failing stage:
>>
>> #####################################################################
>>
>>
>> == Parsed Logical Plan ==
>> Aggregate [count(1) AS count#265L]
>> +- Project [id#0L, label#183, features#208]
>>    +- Project [id#0L, label#183, num_feat#202, features#208]
>>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
>> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
>> num_feat#202]
>>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
>>                +- Project [id#0L, label_val#99, segment#2,
>> indexedSegs#93, cast(label_val#99 as double) AS label#183]
>>                   +- Project [id#0L, label#1 AS label_val#99, segment#2,
>> indexedSegs#93]
>>                      +- Project [id#0L, label#1, segment#2,
>> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>>                         +- MetastoreRelation pmccarthy, all_data_long
>>
>> == Analyzed Logical Plan ==
>> count: bigint
>> Aggregate [count(1) AS count#265L]
>> +- Project [id#0L, label#183, features#208]
>>    +- Project [id#0L, label#183, num_feat#202, features#208]
>>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
>> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
>> num_feat#202]
>>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
>>                +- Project [id#0L, label_val#99, segment#2,
>> indexedSegs#93, cast(label_val#99 as double) AS label#183]
>>                   +- Project [id#0L, label#1 AS label_val#99, segment#2,
>> indexedSegs#93]
>>                      +- Project [id#0L, label#1, segment#2,
>> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>>                         +- MetastoreRelation pmccarthy, all_data_long
>>
>> == Optimized Logical Plan ==
>> Aggregate [count(1) AS count#265L]
>> +- Project
>>    +- InMemoryRelation [id#0L, label#183, features#208], true, 10000,
>> StorageLevel(disk, memory, deserialized, 1 replicas)
>>          +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208]
>>             +- BatchEvalPython [<lambda>(collect_list_is#197, 56845.0)],
>> [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>>                +- SortAggregate(key=[id#0L, label#183],
>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
>> collect_list_is#197])
>>                   +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS
>> FIRST], false, 0
>>                      +- Exchange hashpartitioning(id#0L, label#183, 200)
>>                         +- *Project [id#0L, indexedSegs#93,
>> cast(label_val#99 as double) AS label#183]
>>                            +- InMemoryTableScan [id#0L, indexedSegs#93,
>> label_val#99]
>>                                  +- InMemoryRelation [id#0L,
>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
>> memory, deserialized, 1 replicas)
>>                                        +- *Project [id#0L, label#1 AS
>> label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>>                                           +- HiveTableScan [id#0L,
>> label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)], output=[count#265L])
>> +- Exchange SinglePartition
>>    +- *HashAggregate(keys=[], functions=[partial_count(1)],
>> output=[count#284L])
>>       +- InMemoryTableScan
>>             +- InMemoryRelation [id#0L, label#183, features#208], true,
>> 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>>                   +- *Project [id#0L, label#183, pythonUDF0#244 AS
>> features#208]
>>                      +- BatchEvalPython [<lambda>(collect_list_is#197,
>> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>>                         +- SortAggregate(key=[id#0L, label#183],
>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
>> collect_list_is#197])
>>                            +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC
>> NULLS FIRST], false, 0
>>                               +- Exchange hashpartitioning(id#0L,
>> label#183, 200)
>>                                  +- *Project [id#0L, indexedSegs#93,
>> cast(label_val#99 as double) AS label#183]
>>                                     +- InMemoryTableScan [id#0L,
>> indexedSegs#93, label_val#99]
>>                                           +- InMemoryRelation [id#0L,
>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
>> memory, deserialized, 1 replicas)
>>                                                 +- *Project [id#0L,
>> label#1 AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>>                                                    +- HiveTableScan
>> [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>>
>>
>> --
> Best Regards,
> Ayan Guha
>
-- 
Best Regards,
Ayan Guha

Reply via email to