Quick question: Why is it better to use one sql vs multiple withColumn? isnt everything eventually rewritten by catalyst?
On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack <m...@enrico.minack.dev> wrote: > How many withColumn statements do you have? Note that it is better to use > a single select, rather than lots of withColumn. This also makes drops > redundant. > > Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is > really slow. Can you try this on a single machine, i.e. run wit "local[*]". > > Can you rule out the writing part by counting the rows? I presume this all > happens in a single stage. > > Enrico > > > Am 18.12.19 um 10:56 schrieb Antoine DUBOIS: > > Hello > > I'm working on an ETL based on csv describing file systems to transform it > into parquet so I can work on them easily to extract informations. > I'm using Mr. Powers framework Daria to do so. I've quiet different input > and a lot of transformation and the framework helps organize the code. > I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and > 32GB of memory each. > The storage is handle by a CephFS volume mounted on all nodes. > First a small description of my algorithm (it's quiet simple): > > Use SparkContext to load the csv.bz2 file, > Chain a lot of withColumn() statement, > Drop all unnecessary columns, > Write parquet file to CephFS > > This treatment can take several hours depending on how much lines the CSV > is and I wanted to identify if bz2 or network could be an issue > so I run the following test (several time with consistent result) : > I tried the following scenario with 20 cores and 2 core per task: > > - Read the csv.bz2 from CephFS with connection with 1Gb/s for each > node: ~5 minutes. > - Read the csv.bz2 from TMPFS(setup to look like a shared storage > space): ~5 minutes. > - From the 2 previous tests I concluded that uncompressing the file > was part of the bottleneck so I decided to uncompress the file and store it > in TMPFS as well, result: ~5.9 minutes. > > The test file has 25'833'369 lines and is 370MB compressed and 3700MB > uncompressed. Those results have been reproduced several time each. > My question here is by what am I bottleneck in this case ? > > I though that the uncompressed file in RAM would be the fastest. Is it > possible that my program is suboptimal reading the CSV ? > In the execution logs on the cluster I have 5 to 10 seconds GC time max, > and timeline shows mainly CPU time (no shuffling, no randomization overload > either). > I also noticed that memory storage is never used during the execution. I > know from several hours of research that bz2 is the only real compression > algorithm usable as an input in spark for parallelization reasons. > > Do you have any idea of why such a behaviour ? > and do you have any idea on how to improve such treatment ? > > Cheers > > Antoine > > > -- Best Regards, Ayan Guha