Manu,
thank you very much for your response. 

1. Your post helps to further optimize the spark jobs for wide data.
(https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015)
 The suggested change of code:

df.select(df.columns.map { col =>
  df(col).isNotNull
}: _*)

provides much better performance compared to the previous approach (where we
use .withColumn method and loop over initial columns). The difference
becomes astonishing when using the current Spark master (2.4.0-SNAPSHOT).
Please, see the results of our measurements at the plot below.
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/t3091/loop-withColumn_VS_select-map-2.png>
 
The combination of the recent improvement in the Catalyst optimizer and more
efficient code makes the game changing difference: the job duration becomes
linear on the number of columns. The test jobs are able to process 40K
columns in less than 20 seconds. In contrast, before the optimizations (see
the code from the previous posts) the jobs were not able to process more
than 1600 columns (which was taking minutes). 

2.  CACHING

Manu Zhang wrote
>>For 2, I guess `cache` will break up the logical plan and force it be
>>analyzed.

According to this explanation, caching does not break the logical plan:

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md#cache-and-checkpoint

The structure of our testing jobs with caching (see our previous posts with
the code and results) is very basic:
.read csv -> .cache -> .count 
compared to:
.read csv -> .count
Addition of caching increases the job duration significantly. This is
especially critical in Spark-2.4.0-SNAPSHOT. There, the jobs have linear
duration on the number of columns without caching, but it becomes polynomial
when caching is added. The csv files used for testing are approx. 2MB, so it
should not be a problem to accommodate them in memory. As far as we
understand, this is not an expected behavior of caching.  

3. 

antonkulaga  wrote
>> did you try to test somewhing more complex, like dataframe.describe or
>> PCA?

Anton Kulaga,
we use dataframe.describe mostly for the debugging purposes. Its execution
takes additional time, but we did not perform measurements, because,
typically, it is not included in the production jobs.
We also did not tested PCA transformations. It would be very interesting if
you could share your observations/measurements for those. 

CONCLUSION:
-Using .withColumn has a high cost in Catalyst optimizer. Alternative
approach using .select with mapping of columns allows to reduce job duration
dramatically and enables processing tables with tens of thousands of
columns. 
-It would be interesting to further investigate how the complexity of
caching is influenced by the number of columns. 



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to