Hi Marco, many thanks for pointing the related Spark commit. According to the discription, it introduces indexed (instead of linear) search over columns in LogicalPlan.resolve(...). We have performed the tests on the current Spark master branch and would like to share the results. There are some good (see 1) and not so good news (see 2 and 3).
RESULTS OF TESTING CURRENT MASTER: 1. First of all, the performance of the test jobs described in the initial post has improved dramatically. In the new version the duration is linear on the number of columns (observed up to 40K columns). Please, see the plot below <http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/spark-read-count_2-3_VS_2-4.png> The similar results were observed for the transfromations: filter, groupBy, sum, withColumn, drop. This is a huge performance improvement which is critical to those working with wide tables, e.g. in machine learning or importing data from legacy systems. Many thanks to the authors of this commit. 2. When adding caching to the test jobs (.cache() right before the .count()) the duration of jobs increases and become polynomial on the number of columns. The plot below shows the effect of caching in both spark 2.3.1 and 2.4.0-SNAPSHOT for a better comparison. <http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/spark-cache_2-3_VS_2-4.png> The spark 2.4.0-SNAPSHOT completes the jobs faster than 2.3.1. However, the reason for the polynomial compexity of caching on columns is not very clear. 3. We have also performed tests with more complex transformations. Compared to the initial test jobs, the following transformation is added: df.schema.fields.foldLeft(df)({ // iterate over initial columns case (accDf: DataFrame, attr: StructField) => { accDf.withColumn(s"${attr.name}_isNotNull", df.col(attr.name).isNotNull) // add new column .drop(attr.name) // remove initial column } }).count() It iterates over the initial columns. For each column it adds a new boolean column indicating if the value in the initial column is not null. Then the initial column is dropped. The measured job duration VS number of columns is at the plot below. <http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/spark-loop-over-columns_2-3_VS_2-4.png> The duration of such jobs has significantly increased compared to Spark 2.3.1. Again, it is polynomial on the number of columns. CONSIDERED OPTIMIZATIONS: a) Disabling constraint propagation <spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)> decreases the duration by 15%, but does not solve the principal problem. b) Checkpointing after every 100 columns may decrease the time (by up to 40% in our experiments). It prunes the linage and therefore simplifies the work for the Catalyst optimizer. However, it comes at a high cost: the executers have to scan over all the rows at each checkpoint. In many situations (e.g. > 100K rows per executor, or narrow tables with < 100 columns) checkpointing increases the overall duration. Even in the idealistic case of just a few rows, the speed-up by checkpointing is still not enough to adrees many tens of thousands of columns. CONCLUSION: The new improvement in the upcoming spark 2.4 introduces indexed search over columns in LogicalPlan.resolve(...). It results in a great performance improvement in basic transformations. However, there are still some transfromations which are problematic for wide data. In particular, .cache() demonstrates polynomial complexity on the number of columns. The duration of jobs featuring iteration over columns is increased compared to the current Spark-2.3.1. There are potentially parts of code where search over columns remaines linear. A discussion on further possible optimization is very welcome. -- Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org