Hi Cheng,

I think there still isn’t a PR for this, right? Do you need any help? I am very 
interested in this feature getting into master, so I am happy to help or even 
move this feature forward based on the PR you opened, but I don’t want to 
“steal” the feature from you if you’re interested in pushing it forward 
yourself 😊 Let me know if you need an extra pair of hands!

Thanks,
Ximo.

De: Cheng Su <chen...@fb.com>
Enviado el: miércoles, 9 de septiembre de 2020 8:57
Para: XIMO GUANTER GONZALBEZ <joaquin.guantergonzal...@telefonica.com>; Reynold 
Xin <r...@databricks.com>
CC: Spark Dev List <dev@spark.apache.org>
Asunto: Re: Avoiding unnnecessary sort in 
FileFormatWriter/DynamicPartitionDataWriter

Thanks, Ximo. On our side, we do see the similar cases in production as well 
and we added this feature internally couple years ago. Let me submit new PR 
(which is mostly to rebase https://github.com/apache/spark/pull/23163 to latest 
master and try to have better code structure), if there’s no objection.

Thanks,
Cheng Su

From: XIMO GUANTER GONZALBEZ 
<joaquin.guantergonzal...@telefonica.com<mailto:joaquin.guantergonzal...@telefonica.com>>
Date: Sunday, September 6, 2020 at 10:55 PM
To: Cheng Su <chen...@fb.com<mailto:chen...@fb.com>>, Reynold Xin 
<r...@databricks.com<mailto:r...@databricks.com>>
Cc: Spark Dev List <dev@spark.apache.org<mailto:dev@spark.apache.org>>
Subject: RE: Avoiding unnnecessary sort in 
FileFormatWriter/DynamicPartitionDataWriter

> 1.        If number of writers exceeds a pre-defined threshold (controlled by 
> a config), we sort rest of input rows, and fallback to current mode for write.
> The config can be disabled by default to be consistent with current behavior, 
> and users can choose to opt-in to non-sort mode if they are benefitted with 
> not sorting on large amount of data.

With both of those points in place, I think the plan is super reasonable since 
it wouldn’t affect anyone who isn’t actively tuning Spark, and enables those of 
us who are hitting this sort to have the tools to improve performance in our 
scenario.

Cheers,
Ximo.

De: Cheng Su <chen...@fb.com<mailto:chen...@fb.com>>
Enviado el: viernes, 4 de septiembre de 2020 20:38
Para: Reynold Xin <r...@databricks.com<mailto:r...@databricks.com>>; XIMO 
GUANTER GONZALBEZ 
<joaquin.guantergonzal...@telefonica.com<mailto:joaquin.guantergonzal...@telefonica.com>>
CC: Spark Dev List <dev@spark.apache.org<mailto:dev@spark.apache.org>>
Asunto: Re: Avoiding unnnecessary sort in 
FileFormatWriter/DynamicPartitionDataWriter

Hi,

Just for context - I created the JIRA for this around 2 years ago 
(https://issues.apache.org/jira/browse/SPARK-26164<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D26164&d=DwMGaQ&c=5VD0RTtNlTh3ycd41b3MUw&r=-rGDw9b4dZIpgTn-Pa8RTw&m=j6D6nZ8BfwD7T12P4vv6q99RDiJgYha2RKgbn2xEYuM&s=y_1d0LwMdTnnze-EUi5IL5jSm-tFQvSYToFxyV3CvEc&e=>
 and a stale PR not merged - https://github.com/apache/spark/pull/23163), and I 
recently discussed with Wenchen again, it looks like it might be reasonable to:


  1.  Open multiple writers in parallel to write partitions/buckets.
  2.  If number of writers exceeds a pre-defined threshold (controlled by a 
config), we sort rest of input rows, and fallback to current mode for write.

The approach uses number of writers to be proxy for memory usage here, I agree 
this is quite rudimentary. But given memory usage from writers is non-visible 
to spark now, it seems to me that there’s no other good way to model the memory 
usage for write. Internally we did the thing in same way, but our internal ORC 
is customized to better work with internal Spark for memory usage so we don’t 
see much issue for OOM (non-vectorization code path).

The config can be disabled by default to be consistent with current behavior, 
and users can choose to opt-in to non-sort mode if they are benefitted with not 
sorting on large amount of data.

Does it sound good as a plan? Would like to get more opinion on this. Thanks.

Cheng Su

From: Reynold Xin <r...@databricks.com<mailto:r...@databricks.com>>
Date: Friday, September 4, 2020 at 10:33 AM
To: XIMO GUANTER GONZALBEZ 
<joaquin.guantergonzal...@telefonica.com<mailto:joaquin.guantergonzal...@telefonica.com>>
Cc: Spark Dev List <dev@spark.apache.org<mailto:dev@spark.apache.org>>
Subject: Re: Avoiding unnnecessary sort in 
FileFormatWriter/DynamicPartitionDataWriter


The issue is memory overhead. Writing files create a lot of buffer (especially 
in columnar formats like Parquet/ORC). Even a few file handlers and buffers per 
task can OOM the entire process easily.


On Fri, Sep 04, 2020 at 5:51 AM, XIMO GUANTER GONZALBEZ 
<joaquin.guantergonzal...@telefonica.com<mailto:joaquin.guantergonzal...@telefonica.com>>
 wrote:
Hello,

I have observed that if a DataFrame is saved with partitioning columns in 
Parquet, then a sort is performed in FileFormatWriter (see 
https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L152)
 because DynamicPartitionDataWriter only supports having a single file open at 
a time (see 
https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L170-L171).
 I think it would be possible to avoid this sort (which is a major bottleneck 
for some of my scenarios) if DynamicPartitionDataWriter could have multiple 
files open at the same time, and writing each piece of data to its 
corresponding file.

Would that change be a welcome PR for the project or is there any major problem 
that I am not considering that would prevent removing this sort?

Thanks,
Ximo.




Some more detail about the problem, in case I didn’t explain myself correctly: 
suppose we have a dataframe which we want to partition by column A:

Column A
Column B
4
A
1
B
2
C

The current behavior will first sort the dataframe:

Column A
Column B
1
B
2
C
4
A

So that DynamicPartitionDataWriter can have a single file open, since all the 
data for a single partition will be adjacent and can be iterated over 
sequentially. In order to process the first row, DynamicPartitionDataWriter 
will open a file in /columnA=1/part-r-00000-<uuid>.parquet and write the data. 
When processing the second row it will see it belongs to a different partition, 
closet he first file and open a new file in 
/columna=2/part-r-00000-<uuid>.parquet and so on.

My proposed change would involve changing DynamicPartitionDataWriter to have as 
many open files as partitions, and close them all once all data has been 
processed.

________________________________

Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener información privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilización, divulgación y/o copia sin 
autorización puede estar prohibida en virtud de la legislación vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode 
conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa 
ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica 
notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização 
pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destruição


________________________________

Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener información privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilización, divulgación y/o copia sin 
autorización puede estar prohibida en virtud de la legislación vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode 
conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa 
ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica 
notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização 
pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destruição

________________________________

Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener información privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilización, divulgación y/o copia sin 
autorización puede estar prohibida en virtud de la legislación vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode 
conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa 
ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica 
notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização 
pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destruição

Reply via email to