Cool, thanks! Very helpful

On Fri, 20 Dec 2019 at 6:53 pm, Enrico Minack <m...@enrico.minack.dev>
wrote:

> The issue is explained in depth here:
> https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015
>
> Am 19.12.19 um 23:33 schrieb Chris Teoh:
>
> As far as I'm aware it isn't any better. The logic all gets processed by
> the same engine so to confirm, compare the DAGs generated from both
> approaches and see if they're identical.
>
> On Fri, 20 Dec 2019, 8:56 am ayan guha, <guha.a...@gmail.com> wrote:
>
>> Quick question: Why is it better to use one sql vs multiple withColumn?
>> isnt everything eventually rewritten by catalyst?
>>
>> On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack <m...@enrico.minack.dev>
>> wrote:
>>
>>> How many withColumn statements do you have? Note that it is better to
>>> use a single select, rather than lots of withColumn. This also makes drops
>>> redundant.
>>>
>>> Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is
>>> really slow. Can you try this on a single machine, i.e. run wit "local[*]".
>>>
>>> Can you rule out the writing part by counting the rows? I presume this
>>> all happens in a single stage.
>>>
>>> Enrico
>>>
>>>
>>> Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
>>>
>>> Hello
>>>
>>> I'm working on an ETL based on csv describing file systems to transform
>>> it into parquet so I can work on them easily to extract informations.
>>> I'm using Mr. Powers framework Daria to do so. I've quiet different
>>> input and a lot of transformation and the framework helps organize the
>>> code.
>>> I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and
>>> 32GB of memory each.
>>> The storage is handle by a CephFS volume mounted on all nodes.
>>> First a small description of my algorithm (it's quiet simple):
>>>
>>> Use SparkContext to load the csv.bz2 file,
>>> Chain a lot of withColumn() statement,
>>> Drop all unnecessary columns,
>>> Write parquet file to CephFS
>>>
>>> This treatment can take several hours depending on how much lines the
>>> CSV is and I wanted to identify if bz2 or network could be an issue
>>> so I run the following test (several time with consistent result) :
>>> I tried the following scenario with 20 cores and 2 core per task:
>>>
>>>    - Read the csv.bz2 from CephFS with connection with 1Gb/s for each
>>>    node: ~5 minutes.
>>>    - Read the csv.bz2 from TMPFS(setup to look like a shared storage
>>>    space): ~5 minutes.
>>>    - From the 2 previous tests I concluded that uncompressing the file
>>>    was part of the bottleneck so I decided to uncompress the file and store 
>>> it
>>>    in TMPFS as well, result: ~5.9 minutes.
>>>
>>> The test file has 25'833'369 lines and is 370MB compressed and 3700MB
>>> uncompressed. Those results have been reproduced several time each.
>>> My question here is by what am I bottleneck in this case ?
>>>
>>> I though that the uncompressed file in RAM would be the fastest. Is it
>>> possible that my program is suboptimal reading the CSV ?
>>> In the execution logs on the cluster I have 5 to 10 seconds GC time max,
>>> and timeline shows mainly CPU time (no shuffling, no randomization overload
>>> either).
>>> I also noticed that memory storage is never used during the execution. I
>>> know from several hours of research that bz2 is the only real compression
>>> algorithm usable as an input in spark for parallelization reasons.
>>>
>>> Do you have any idea of why such a behaviour ?
>>> and do you have any idea on how to improve such treatment ?
>>>
>>> Cheers
>>>
>>> Antoine
>>>
>>>
>>> --
>> Best Regards,
>> Ayan Guha
>>
>
> --
Best Regards,
Ayan Guha

Reply via email to