Hello,

I'm trying to build an ETL job which takes in 30-100gb of text data and
prepares it for SparkML. I don't speak Scala so I've been trying to
implement in PySpark on YARN, Spark 2.1.

Despite the transformations being fairly simple, the job always fails by
running out of executor memory.

The input table is long (~6bn rows) but composed of three simple values:

#####################################################################
all_data_long.printSchema()

root
|-- id: long (nullable = true)
|-- label: short (nullable = true)
|-- segment: string (nullable = true)

#####################################################################

First I join it to a table of particular segments of interests and do an
aggregation,

#####################################################################

audiences.printSchema()

root
 |-- entry: integer (nullable = true)
 |-- descr: string (nullable = true)


print("Num in adl: {}".format(str(all_data_long.count())))

aud_str = audiences.select(audiences['entry'].cast('string'),
        audiences['descr'])

alldata_aud = all_data_long.join(aud_str,
        all_data_long['segment']==aud_str['entry'],
        'left_outer')

str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')

idx_df   = str_idx.fit(alldata_aud)
label_df =
idx_df.transform(alldata_aud).withColumnRenamed('label','label_val')

id_seg = (label_df
        .filter(label_df.descr.isNotNull())
        .groupBy('id')
        .agg(collect_list('descr')))

id_seg.write.saveAsTable("hive.id_seg")

#####################################################################

Then, I use that StringIndexer again on the first data frame to featurize
the segment ID

#####################################################################

alldat_idx =
idx_df.transform(all_data_long).withColumnRenamed('label','label_val')

#####################################################################


My ultimate goal is to make a SparseVector, so I group the indexed segments
by id and try to cast it into a vector

#####################################################################

list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0
for v in l}),VectorUDT())

alldat_idx.cache()

feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)

print("alldat_dix: {}".format(str(alldat_idx.count())))

feature_df = (alldat_idx
        .withColumn('label',alldat_idx['label_val'].cast('double'))
        .groupBy('id','label')

.agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
        .withColumn('num_feat',lit(feature_vec_len))

.withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
        .drop('collect_list_is')
        .drop('num_feat'))

feature_df.cache()
print("Num in featuredf: {}".format(str(feature_df.count())))  ## <-
failure occurs here

#####################################################################

Here, however, I always run out of memory on the executors (I've twiddled
driver and executor memory to check) and YARN kills off my containers. I've
gone as high as —executor-memory 15g but it still doesn't help.

Given the number of segments is at most 50,000 I'm surprised that a
smallish row-wise operation is enough to blow up the process.


Is it really the UDF that's killing me? Do I have to rewrite it in Scala?





Query plans for the failing stage:

#####################################################################


== Parsed Logical Plan ==
Aggregate [count(1) AS count#265L]
+- Project [id#0L, label#183, features#208]
   +- Project [id#0L, label#183, num_feat#202, features#208]
      +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
<lambda>(collect_list_is#197, num_feat#202) AS features#208]
         +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
num_feat#202]
            +- Aggregate [id#0L, label#183], [id#0L, label#183,
sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
               +- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
cast(label_val#99 as double) AS label#183]
                  +- Project [id#0L, label#1 AS label_val#99, segment#2,
indexedSegs#93]
                     +- Project [id#0L, label#1, segment#2,
UDF(cast(segment#2 as string)) AS indexedSegs#93]
                        +- MetastoreRelation pmccarthy, all_data_long

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#265L]
+- Project [id#0L, label#183, features#208]
   +- Project [id#0L, label#183, num_feat#202, features#208]
      +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
<lambda>(collect_list_is#197, num_feat#202) AS features#208]
         +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
num_feat#202]
            +- Aggregate [id#0L, label#183], [id#0L, label#183,
sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
               +- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
cast(label_val#99 as double) AS label#183]
                  +- Project [id#0L, label#1 AS label_val#99, segment#2,
indexedSegs#93]
                     +- Project [id#0L, label#1, segment#2,
UDF(cast(segment#2 as string)) AS indexedSegs#93]
                        +- MetastoreRelation pmccarthy, all_data_long

== Optimized Logical Plan ==
Aggregate [count(1) AS count#265L]
+- Project
   +- InMemoryRelation [id#0L, label#183, features#208], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208]
            +- BatchEvalPython [<lambda>(collect_list_is#197, 56845.0)],
[id#0L, label#183, collect_list_is#197, pythonUDF0#244]
               +- SortAggregate(key=[id#0L, label#183],
functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
collect_list_is#197])
                  +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS
FIRST], false, 0
                     +- Exchange hashpartitioning(id#0L, label#183, 200)
                        +- *Project [id#0L, indexedSegs#93,
cast(label_val#99 as double) AS label#183]
                           +- InMemoryTableScan [id#0L, indexedSegs#93,
label_val#99]
                                 +- InMemoryRelation [id#0L, label_val#99,
segment#2, indexedSegs#93], true, 10000, StorageLevel(disk, memory,
deserialized, 1 replicas)
                                       +- *Project [id#0L, label#1 AS
label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
                                          +- HiveTableScan [id#0L, label#1,
segment#2], MetastoreRelation pmccarthy, all_data_long

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)], output=[count#265L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)],
output=[count#284L])
      +- InMemoryTableScan
            +- InMemoryRelation [id#0L, label#183, features#208], true,
10000, StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *Project [id#0L, label#183, pythonUDF0#244 AS
features#208]
                     +- BatchEvalPython [<lambda>(collect_list_is#197,
56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
                        +- SortAggregate(key=[id#0L, label#183],
functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
collect_list_is#197])
                           +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC
NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(id#0L,
label#183, 200)
                                 +- *Project [id#0L, indexedSegs#93,
cast(label_val#99 as double) AS label#183]
                                    +- InMemoryTableScan [id#0L,
indexedSegs#93, label_val#99]
                                          +- InMemoryRelation [id#0L,
label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
memory, deserialized, 1 replicas)
                                                +- *Project [id#0L, label#1
AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
                                                   +- HiveTableScan [id#0L,
label#1, segment#2], MetastoreRelation pmccarthy, all_data_long

Reply via email to