Hi Cheng, Is there some place where I can get more details on this, or if you could give a couple of lines explaining about it.
> But given memory usage from writers is non-visible to spark now, it seems > to me that there’s no other good way to model the memory usage for write. regards kalyan. On Sat, Sep 5, 2020 at 12:07 AM Cheng Su <chen...@fb.com.invalid> wrote: > Hi, > > > > Just for context - I created the JIRA for this around 2 years ago ( > https://issues.apache.org/jira/browse/SPARK-26164 and a stale PR not > merged - https://github.com/apache/spark/pull/23163), and I recently > discussed with Wenchen again, it looks like it might be reasonable to: > > > > 1. Open multiple writers in parallel to write partitions/buckets. > 2. If number of writers exceeds a pre-defined threshold (controlled by > a config), we sort rest of input rows, and fallback to current mode for > write. > > > > The approach uses number of writers to be proxy for memory usage here, I > agree this is quite rudimentary. But given memory usage from writers is > non-visible to spark now, it seems to me that there’s no other good way to > model the memory usage for write. Internally we did the thing in same way, > but our internal ORC is customized to better work with internal Spark for > memory usage so we don’t see much issue for OOM (non-vectorization code > path). > > > > The config can be disabled by default to be consistent with current > behavior, and users can choose to opt-in to non-sort mode if they are > benefitted with not sorting on large amount of data. > > > > Does it sound good as a plan? Would like to get more opinion on this. > Thanks. > > > > Cheng Su > > > > *From: *Reynold Xin <r...@databricks.com> > *Date: *Friday, September 4, 2020 at 10:33 AM > *To: *XIMO GUANTER GONZALBEZ <joaquin.guantergonzal...@telefonica.com> > *Cc: *Spark Dev List <dev@spark.apache.org> > *Subject: *Re: Avoiding unnnecessary sort in > FileFormatWriter/DynamicPartitionDataWriter > > > > [image: Image removed by sender.] > > The issue is memory overhead. Writing files create a lot of buffer > (especially in columnar formats like Parquet/ORC). Even a few file handlers > and buffers per task can OOM the entire process easily. > > > > > > On Fri, Sep 04, 2020 at 5:51 AM, XIMO GUANTER GONZALBEZ < > joaquin.guantergonzal...@telefonica.com> wrote: > > Hello, > > > > I have observed that if a DataFrame is saved with partitioning columns in > Parquet, then a sort is performed in FileFormatWriter (see > https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L152) > because DynamicPartitionDataWriter only supports having a single file open > at a time (see > https://github.com/apache/spark/blob/v3.0.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala#L170-L171). > I think it would be possible to avoid this sort (which is a major > bottleneck for some of my scenarios) if DynamicPartitionDataWriter could > have multiple files open at the same time, and writing each piece of data > to its corresponding file. > > > > Would that change be a welcome PR for the project or is there any major > problem that I am not considering that would prevent removing this sort? > > > > Thanks, > > Ximo. > > > > > > > > > > Some more detail about the problem, in case I didn’t explain myself > correctly: suppose we have a dataframe which we want to partition by column > A: > > > > *Column A* > > *Column B* > > 4 > > A > > 1 > > B > > 2 > > C > > > > The current behavior will first sort the dataframe: > > > > *Column A* > > *Column B* > > 1 > > B > > 2 > > C > > 4 > > A > > > > So that DynamicPartitionDataWriter can have a single file open, since all > the data for a single partition will be adjacent and can be iterated over > sequentially. In order to process the first row, DynamicPartitionDataWriter > will open a file in /columnA=1/part-r-00000-<uuid>.parquet and write the > data. When processing the second row it will see it belongs to a different > partition, closet he first file and open a new file in > /columna=2/part-r-00000-<uuid>.parquet and so on. > > > > My proposed change would involve changing DynamicPartitionDataWriter to > have as many open files as partitions, and close them all once all data has > been processed. > > > ------------------------------ > > > Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, > puede contener información privilegiada o confidencial y es para uso > exclusivo de la persona o entidad de destino. Si no es usted. el > destinatario indicado, queda notificado de que la lectura, utilización, > divulgación y/o copia sin autorización puede estar prohibida en virtud de > la legislación vigente. Si ha recibido este mensaje por error, le rogamos > que nos lo comunique inmediatamente por esta misma vía y proceda a su > destrucción. > > The information contained in this transmission is privileged and > confidential information intended only for the use of the individual or > entity named above. If the reader of this message is not the intended > recipient, you are hereby notified that any dissemination, distribution or > copying of this communication is strictly prohibited. If you have received > this transmission in error, do not read it. Please immediately reply to the > sender that you have received this communication in error and then delete > it. > > Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, > pode conter informação privilegiada ou confidencial e é para uso exclusivo > da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário > indicado, fica notificado de que a leitura, utilização, divulgação e/ou > cópia sem autorização pode estar proibida em virtude da legislação vigente. > Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique > imediatamente por esta mesma via e proceda a sua destruição > > >