There are other ways to prepare the data. You just need to make sure that the data for each partition is clustered so that you don’t open more than one file per partition in a task.
A global sort is usually the best way to do that, but you can use a local sort as well, by using sortWithinPartitions. That will sort each task’s data locally and you can use it to cluster the data by partition. Another option is to try to reduce the cost of the sort by looking at what the job is doing. A sort in Spark will run two jobs: one to estimate skew and produce sort partition ranges, and a second to shuffle the data using those ranges and finally write. When those jobs run, the second will reuse the work done by the first job if there is available shuffle data with the un-sorted rows. We often see jobs that don’t have shuffle data available taking a lot longer because some expensive operation is run twice — either reading the data from the table twice, or running an expensive UDF twice. In that case, I recommend adding a round-robin repartition ( df.repartition(1000)) to add a shuffle after the expensive part of the query, so that the second job can skip the expensive stages before that point. >From the description here about not having enough shuffle space, it sounds like the first option makes the most sense if you have the temp space for that operation. But it could be that the second option works because it ensures that the data going into the shuffle is evenly distributed through the cluster, which might help if you’re running out of space on a single node. Also make sure that your parallelism is high enough that you're using the entire cluster and not just a few nodes. rb On Fri, Sep 6, 2019 at 9:19 AM Xabriel Collazo Mojica <xcoll...@adobe.com.invalid> wrote: > Hi folks, > > > > We are now consistently hitting this problem: > > > > 1) We want to write a data set in the order of 1-5TB of data into a > partitioned Iceberg table. > > 2) Iceberg, on a single write, will fail a partitioned writer if it > detects the data is not sorted. This has been discussed before in this > list, and as per Ryan, this behavior is there to avoid the small files > problem. > > 3) Thus, you are forced to do a sort. Ryan recommends to ‘sort by all > partitioning columns, plus a high cardinality column like an ID’ (See > Iceberg Mail Dev thread “Timeout waiting for connection from pool (S3)“). > > 4) For small writes, this is no problem, but I have had many job failures > by trying to sort and running out of temp space while shuffling. The worker > nodes are well configured, with 800GB of temp space available. > > 5) When it does succeed, then I have the issue that the sort consistently > makes writes take at least double the time. > > > > To summarize: > > > > Given the cost of sorting TBs of data, any tips on what to do in Iceberg > to avoid it? > > Should we allow unsorted writes to go thru with a flag? > > > > Thanks, > > *Xabriel J Collazo Mojica* | Senior Software Engineer | Adobe | > xcoll...@adobe.com > -- Ryan Blue Software Engineer Netflix