The partitions helped!

I added repartition() and my function looks like this now:

feature_df = (alldat_idx
        .withColumn('label',alldat_idx['label_val'].cast('double'))
        .groupBy('id','label')

.agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
        .repartition(1000)
        .withColumn('num_feat',lit(feature_vec_len))

.withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
        .drop('collect_list_is')
        .drop('num_feat'))

I got a few failed containers for memory overflow, but the job was able to
finish successfully. I tried upping the repartition as high as 4000 but a
few still failed.

For posterity's sake, where would I look for the footprint you have in
mind? On the executor tab?

Since the audience part of the task finished successfully and the failure
was on a df that didn't touch it, it shouldn't've made a difference.

Thank you!

On Sat, Apr 15, 2017 at 9:07 PM, ayan guha <guha.a...@gmail.com> wrote:

> What i missed is try increasing number of partitions using repartition
>
> On Sun, 16 Apr 2017 at 11:06 am, ayan guha <guha.a...@gmail.com> wrote:
>
>> It does not look like scala vs python thing. How big is your audience
>> data store? Can it be broadcasted?
>>
>> What is the memory footprint you are seeing? At what point yarn is
>> killing? Depeneding on that you may want to tweak around number of
>> partitions of input dataset and increase number of executors
>>
>> Ayan
>>
>>
>> On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <pmccar...@dstillery.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I'm trying to build an ETL job which takes in 30-100gb of text data and
>>> prepares it for SparkML. I don't speak Scala so I've been trying to
>>> implement in PySpark on YARN, Spark 2.1.
>>>
>>> Despite the transformations being fairly simple, the job always fails by
>>> running out of executor memory.
>>>
>>> The input table is long (~6bn rows) but composed of three simple values:
>>>
>>> #####################################################################
>>> all_data_long.printSchema()
>>>
>>> root
>>> |-- id: long (nullable = true)
>>> |-- label: short (nullable = true)
>>> |-- segment: string (nullable = true)
>>>
>>> #####################################################################
>>>
>>> First I join it to a table of particular segments of interests and do an
>>> aggregation,
>>>
>>> #####################################################################
>>>
>>> audiences.printSchema()
>>>
>>> root
>>>  |-- entry: integer (nullable = true)
>>>  |-- descr: string (nullable = true)
>>>
>>>
>>> print("Num in adl: {}".format(str(all_data_long.count())))
>>>
>>> aud_str = audiences.select(audiences['entry'].cast('string'),
>>>         audiences['descr'])
>>>
>>> alldata_aud = all_data_long.join(aud_str,
>>>         all_data_long['segment']==aud_str['entry'],
>>>         'left_outer')
>>>
>>> str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')
>>>
>>> idx_df   = str_idx.fit(alldata_aud)
>>> label_df = idx_df.transform(alldata_aud).withColumnRenamed('label','
>>> label_val')
>>>
>>> id_seg = (label_df
>>>         .filter(label_df.descr.isNotNull())
>>>         .groupBy('id')
>>>         .agg(collect_list('descr')))
>>>
>>> id_seg.write.saveAsTable("hive.id_seg")
>>>
>>> #####################################################################
>>>
>>> Then, I use that StringIndexer again on the first data frame to
>>> featurize the segment ID
>>>
>>> #####################################################################
>>>
>>> alldat_idx = idx_df.transform(all_data_long).withColumnRenamed('
>>> label','label_val')
>>>
>>> #####################################################################
>>>
>>>
>>> My ultimate goal is to make a SparseVector, so I group the indexed
>>> segments by id and try to cast it into a vector
>>>
>>> #####################################################################
>>>
>>> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0
>>> for v in l}),VectorUDT())
>>>
>>> alldat_idx.cache()
>>>
>>> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)
>>>
>>> print("alldat_dix: {}".format(str(alldat_idx.count())))
>>>
>>> feature_df = (alldat_idx
>>>         .withColumn('label',alldat_idx['label_val'].cast('double'))
>>>         .groupBy('id','label')
>>>         .agg(sort_array(collect_list('indexedSegs')).alias('collect_
>>> list_is'))
>>>         .withColumn('num_feat',lit(feature_vec_len))
>>>         .withColumn('features',list_to_sparse_udf('collect_list_
>>> is','num_feat'))
>>>         .drop('collect_list_is')
>>>         .drop('num_feat'))
>>>
>>> feature_df.cache()
>>> print("Num in featuredf: {}".format(str(feature_df.count())))  ## <-
>>> failure occurs here
>>>
>>> #####################################################################
>>>
>>> Here, however, I always run out of memory on the executors (I've
>>> twiddled driver and executor memory to check) and YARN kills off my
>>> containers. I've gone as high as —executor-memory 15g but it still doesn't
>>> help.
>>>
>>> Given the number of segments is at most 50,000 I'm surprised that a
>>> smallish row-wise operation is enough to blow up the process.
>>>
>>>
>>> Is it really the UDF that's killing me? Do I have to rewrite it in Scala?
>>>
>>>
>>>
>>>
>>>
>>> Query plans for the failing stage:
>>>
>>> #####################################################################
>>>
>>>
>>> == Parsed Logical Plan ==
>>> Aggregate [count(1) AS count#265L]
>>> +- Project [id#0L, label#183, features#208]
>>>    +- Project [id#0L, label#183, num_feat#202, features#208]
>>>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
>>> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>>>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
>>> num_feat#202]
>>>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
>>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS
>>> collect_list_is#197]
>>>                +- Project [id#0L, label_val#99, segment#2,
>>> indexedSegs#93, cast(label_val#99 as double) AS label#183]
>>>                   +- Project [id#0L, label#1 AS label_val#99, segment#2,
>>> indexedSegs#93]
>>>                      +- Project [id#0L, label#1, segment#2,
>>> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>>>                         +- MetastoreRelation pmccarthy, all_data_long
>>>
>>> == Analyzed Logical Plan ==
>>> count: bigint
>>> Aggregate [count(1) AS count#265L]
>>> +- Project [id#0L, label#183, features#208]
>>>    +- Project [id#0L, label#183, num_feat#202, features#208]
>>>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
>>> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>>>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
>>> num_feat#202]
>>>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
>>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS
>>> collect_list_is#197]
>>>                +- Project [id#0L, label_val#99, segment#2,
>>> indexedSegs#93, cast(label_val#99 as double) AS label#183]
>>>                   +- Project [id#0L, label#1 AS label_val#99, segment#2,
>>> indexedSegs#93]
>>>                      +- Project [id#0L, label#1, segment#2,
>>> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>>>                         +- MetastoreRelation pmccarthy, all_data_long
>>>
>>> == Optimized Logical Plan ==
>>> Aggregate [count(1) AS count#265L]
>>> +- Project
>>>    +- InMemoryRelation [id#0L, label#183, features#208], true, 10000,
>>> StorageLevel(disk, memory, deserialized, 1 replicas)
>>>          +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208]
>>>             +- BatchEvalPython [<lambda>(collect_list_is#197, 56845.0)],
>>> [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>>>                +- SortAggregate(key=[id#0L, label#183],
>>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L,
>>> label#183, collect_list_is#197])
>>>                   +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS
>>> FIRST], false, 0
>>>                      +- Exchange hashpartitioning(id#0L, label#183, 200)
>>>                         +- *Project [id#0L, indexedSegs#93,
>>> cast(label_val#99 as double) AS label#183]
>>>                            +- InMemoryTableScan [id#0L, indexedSegs#93,
>>> label_val#99]
>>>                                  +- InMemoryRelation [id#0L,
>>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
>>> memory, deserialized, 1 replicas)
>>>                                        +- *Project [id#0L, label#1 AS
>>> label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>>>                                           +- HiveTableScan [id#0L,
>>> label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>>>
>>> == Physical Plan ==
>>> *HashAggregate(keys=[], functions=[count(1)], output=[count#265L])
>>> +- Exchange SinglePartition
>>>    +- *HashAggregate(keys=[], functions=[partial_count(1)],
>>> output=[count#284L])
>>>       +- InMemoryTableScan
>>>             +- InMemoryRelation [id#0L, label#183, features#208], true,
>>> 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>>>                   +- *Project [id#0L, label#183, pythonUDF0#244 AS
>>> features#208]
>>>                      +- BatchEvalPython [<lambda>(collect_list_is#197,
>>> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>>>                         +- SortAggregate(key=[id#0L, label#183],
>>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L,
>>> label#183, collect_list_is#197])
>>>                            +- *Sort [id#0L ASC NULLS FIRST, label#183
>>> ASC NULLS FIRST], false, 0
>>>                               +- Exchange hashpartitioning(id#0L,
>>> label#183, 200)
>>>                                  +- *Project [id#0L, indexedSegs#93,
>>> cast(label_val#99 as double) AS label#183]
>>>                                     +- InMemoryTableScan [id#0L,
>>> indexedSegs#93, label_val#99]
>>>                                           +- InMemoryRelation [id#0L,
>>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
>>> memory, deserialized, 1 replicas)
>>>                                                 +- *Project [id#0L,
>>> label#1 AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>>>                                                    +- HiveTableScan
>>> [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>>>
>>>
>>> --
>> Best Regards,
>> Ayan Guha
>>
> --
> Best Regards,
> Ayan Guha
>

Reply via email to