Hi Cheng,
Is there some place where I can get  more details on this, or if you could
give a couple of lines explaining about it.

> But given memory usage from writers is non-visible to spark now, it seems
> to me that there’s no other good way to model the memory usage for write.


regards
kalyan.

On Sat, Sep 5, 2020 at 12:07 AM Cheng Su <chen...@fb.com.invalid> wrote:

> Hi,
>
>
>
> Just for context - I created the JIRA for this around 2 years ago (
> https://issues.apache.org/jira/browse/SPARK-26164 and a stale PR not
> merged - https://github.com/apache/spark/pull/23163), and I recently
> discussed with Wenchen again, it looks like it might be reasonable to:
>
>
>
>    1. Open multiple writers in parallel to write partitions/buckets.
>    2. If number of writers exceeds a pre-defined threshold (controlled by
>    a config), we sort rest of input rows, and fallback to current mode for
>    write.
>
>
>
> The approach uses number of writers to be proxy for memory usage here, I
> agree this is quite rudimentary. But given memory usage from writers is
> non-visible to spark now, it seems to me that there’s no other good way to
> model the memory usage for write. Internally we did the thing in same way,
> but our internal ORC is customized to better work with internal Spark for
> memory usage so we don’t see much issue for OOM (non-vectorization code
> path).
>
>
>
> The config can be disabled by default to be consistent with current
> behavior, and users can choose to opt-in to non-sort mode if they are
> benefitted with not sorting on large amount of data.
>
>
>
> Does it sound good as a plan? Would like to get more opinion on this.
> Thanks.
>
>
>
> Cheng Su
>
>
>
> *From: *Reynold Xin <r...@databricks.com>
> *Date: *Friday, September 4, 2020 at 10:33 AM
> *To: *XIMO GUANTER GONZALBEZ <joaquin.guantergonzal...@telefonica.com>
> *Cc: *Spark Dev List <dev@spark.apache.org>
> *Subject: *Re: Avoiding unnnecessary sort in
> FileFormatWriter/DynamicPartitionDataWriter
>
>
>
> [image: Image removed by sender.]
>
> The issue is memory overhead. Writing files create a lot of buffer
> (especially in columnar formats like Parquet/ORC). Even a few file handlers
> and buffers per task can OOM the entire process easily.
>
>
>
>
>
> On Fri, Sep 04, 2020 at 5:51 AM, XIMO GUANTER GONZALBEZ <
> joaquin.guantergonzal...@telefonica.com> wrote:
>
> Hello,
>
>
>
> I have observed that if a DataFrame is saved with partitioning columns in
> Parquet, then a sort is performed in FileFormatWriter (see
> https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L152)
> because DynamicPartitionDataWriter only supports having a single file open
> at a time (see
> https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L170-L171).
> I think it would be possible to avoid this sort (which is a major
> bottleneck for some of my scenarios) if DynamicPartitionDataWriter could
> have multiple files open at the same time, and writing each piece of data
> to its corresponding file.
>
>
>
> Would that change be a welcome PR for the project or is there any major
> problem that I am not considering that would prevent removing this sort?
>
>
>
> Thanks,
>
> Ximo.
>
>
>
>
>
>
>
>
>
> Some more detail about the problem, in case I didn’t explain myself
> correctly: suppose we have a dataframe which we want to partition by column
> A:
>
>
>
> *Column A*
>
> *Column B*
>
> 4
>
> A
>
> 1
>
> B
>
> 2
>
> C
>
>
>
> The current behavior will first sort the dataframe:
>
>
>
> *Column A*
>
> *Column B*
>
> 1
>
> B
>
> 2
>
> C
>
> 4
>
> A
>
>
>
> So that DynamicPartitionDataWriter can have a single file open, since all
> the data for a single partition will be adjacent and can be iterated over
> sequentially. In order to process the first row, DynamicPartitionDataWriter
> will open a file in /columnA=1/part-r-00000-<uuid>.parquet and write the
> data. When processing the second row it will see it belongs to a different
> partition, closet he first file and open a new file in
> /columna=2/part-r-00000-<uuid>.parquet and so on.
>
>
>
> My proposed change would involve changing DynamicPartitionDataWriter to
> have as many open files as partitions, and close them all once all data has
> been processed.
>
>
> ------------------------------
>
>
> Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario,
> puede contener información privilegiada o confidencial y es para uso
> exclusivo de la persona o entidad de destino. Si no es usted. el
> destinatario indicado, queda notificado de que la lectura, utilización,
> divulgación y/o copia sin autorización puede estar prohibida en virtud de
> la legislación vigente. Si ha recibido este mensaje por error, le rogamos
> que nos lo comunique inmediatamente por esta misma vía y proceda a su
> destrucción.
>
> The information contained in this transmission is privileged and
> confidential information intended only for the use of the individual or
> entity named above. If the reader of this message is not the intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of this communication is strictly prohibited. If you have received
> this transmission in error, do not read it. Please immediately reply to the
> sender that you have received this communication in error and then delete
> it.
>
> Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário,
> pode conter informação privilegiada ou confidencial e é para uso exclusivo
> da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário
> indicado, fica notificado de que a leitura, utilização, divulgação e/ou
> cópia sem autorização pode estar proibida em virtude da legislação vigente.
> Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique
> imediatamente por esta mesma via e proceda a sua destruição
>
>
>

Reply via email to