Cool, thanks! Very helpful On Fri, 20 Dec 2019 at 6:53 pm, Enrico Minack <m...@enrico.minack.dev> wrote:
> The issue is explained in depth here: > https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015 > > Am 19.12.19 um 23:33 schrieb Chris Teoh: > > As far as I'm aware it isn't any better. The logic all gets processed by > the same engine so to confirm, compare the DAGs generated from both > approaches and see if they're identical. > > On Fri, 20 Dec 2019, 8:56 am ayan guha, <guha.a...@gmail.com> wrote: > >> Quick question: Why is it better to use one sql vs multiple withColumn? >> isnt everything eventually rewritten by catalyst? >> >> On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack <m...@enrico.minack.dev> >> wrote: >> >>> How many withColumn statements do you have? Note that it is better to >>> use a single select, rather than lots of withColumn. This also makes drops >>> redundant. >>> >>> Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is >>> really slow. Can you try this on a single machine, i.e. run wit "local[*]". >>> >>> Can you rule out the writing part by counting the rows? I presume this >>> all happens in a single stage. >>> >>> Enrico >>> >>> >>> Am 18.12.19 um 10:56 schrieb Antoine DUBOIS: >>> >>> Hello >>> >>> I'm working on an ETL based on csv describing file systems to transform >>> it into parquet so I can work on them easily to extract informations. >>> I'm using Mr. Powers framework Daria to do so. I've quiet different >>> input and a lot of transformation and the framework helps organize the >>> code. >>> I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and >>> 32GB of memory each. >>> The storage is handle by a CephFS volume mounted on all nodes. >>> First a small description of my algorithm (it's quiet simple): >>> >>> Use SparkContext to load the csv.bz2 file, >>> Chain a lot of withColumn() statement, >>> Drop all unnecessary columns, >>> Write parquet file to CephFS >>> >>> This treatment can take several hours depending on how much lines the >>> CSV is and I wanted to identify if bz2 or network could be an issue >>> so I run the following test (several time with consistent result) : >>> I tried the following scenario with 20 cores and 2 core per task: >>> >>> - Read the csv.bz2 from CephFS with connection with 1Gb/s for each >>> node: ~5 minutes. >>> - Read the csv.bz2 from TMPFS(setup to look like a shared storage >>> space): ~5 minutes. >>> - From the 2 previous tests I concluded that uncompressing the file >>> was part of the bottleneck so I decided to uncompress the file and store >>> it >>> in TMPFS as well, result: ~5.9 minutes. >>> >>> The test file has 25'833'369 lines and is 370MB compressed and 3700MB >>> uncompressed. Those results have been reproduced several time each. >>> My question here is by what am I bottleneck in this case ? >>> >>> I though that the uncompressed file in RAM would be the fastest. Is it >>> possible that my program is suboptimal reading the CSV ? >>> In the execution logs on the cluster I have 5 to 10 seconds GC time max, >>> and timeline shows mainly CPU time (no shuffling, no randomization overload >>> either). >>> I also noticed that memory storage is never used during the execution. I >>> know from several hours of research that bz2 is the only real compression >>> algorithm usable as an input in spark for parallelization reasons. >>> >>> Do you have any idea of why such a behaviour ? >>> and do you have any idea on how to improve such treatment ? >>> >>> Cheers >>> >>> Antoine >>> >>> >>> -- >> Best Regards, >> Ayan Guha >> > > -- Best Regards, Ayan Guha