Hi Marco,
many thanks for pointing the related Spark commit. According to the
discription, it introduces indexed (instead of linear) search over columns
in LogicalPlan.resolve(...).
We have performed the tests on the current Spark master branch and would
like to share the results. There are some good (see 1) and not so good news
(see 2 and 3).

RESULTS OF TESTING CURRENT MASTER:

1. First of all, the performance of the test jobs described in the initial
post has improved dramatically. In the new version the duration is linear on
the number of columns (observed up to 40K columns). Please, see the plot
below
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/spark-read-count_2-3_VS_2-4.png>
 
The similar results were observed for the transfromations: filter, groupBy,
sum, withColumn, drop.  This is a huge performance improvement which is
critical to those working with wide tables, e.g. in machine learning or
importing data from legacy systems. Many thanks to the authors of this
commit. 

2. When adding caching to the test jobs (.cache() right before the .count())
the duration of jobs increases and become polynomial on the number of
columns. The plot below shows the effect of caching in both spark 2.3.1 and
2.4.0-SNAPSHOT for a better comparison.
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/spark-cache_2-3_VS_2-4.png>
 
The spark 2.4.0-SNAPSHOT completes the jobs faster than 2.3.1. However, the
reason for the polynomial compexity of caching on columns is not very clear. 

3. We have also performed tests with more complex transformations. Compared
to the initial test jobs, the following transformation is added:
     
     df.schema.fields.foldLeft(df)({ // iterate over initial columns
       case (accDf: DataFrame, attr: StructField) => {
       accDf.withColumn(s"${attr.name}_isNotNull",
df.col(attr.name).isNotNull) // add new column
         .drop(attr.name) // remove initial column
           }
     }).count()

It iterates over the initial columns. For each column it adds a new boolean
column indicating if the value in the initial column is not null. Then the
initial column is dropped.       
The measured job duration VS number of columns is at the plot below.
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/spark-loop-over-columns_2-3_VS_2-4.png>
 
The duration of such jobs has significantly increased compared to Spark
2.3.1. Again, it is polynomial on the number of columns.

CONSIDERED OPTIMIZATIONS: 
a) Disabling constraint propagation
<spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)>
decreases the duration by 15%, but does not solve the principal problem.

b) Checkpointing after every 100 columns may decrease the time (by up to 40%
in our experiments). It prunes the linage and therefore simplifies the work
for the Catalyst optimizer. However, it comes at a high cost: the executers
have to scan over all the rows at each checkpoint. In many situations (e.g.
> 100K rows per executor, or narrow tables with < 100 columns) checkpointing
increases the overall duration. Even in the idealistic case of just a few
rows, the speed-up by checkpointing is still not enough to adrees many tens
of thousands of columns. 

CONCLUSION:
The new improvement in the upcoming spark 2.4 introduces indexed search over
columns in LogicalPlan.resolve(...). It results in a great performance
improvement in basic transformations. However, there are still some
transfromations which are problematic for wide data. In particular, .cache()
demonstrates polynomial complexity on the number of columns. The duration of
jobs featuring iteration over columns is increased compared to the current
Spark-2.3.1. There are potentially parts of code where search over columns
remaines linear. A discussion on further possible optimization is very
welcome. 







--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to