There are other ways to prepare the data. You just need to make sure that
the data for each partition is clustered so that you don’t open more than
one file per partition in a task.

A global sort is usually the best way to do that, but you can use a local
sort as well, by using sortWithinPartitions. That will sort each task’s
data locally and you can use it to cluster the data by partition.

Another option is to try to reduce the cost of the sort by looking at what
the job is doing. A sort in Spark will run two jobs: one to estimate skew
and produce sort partition ranges, and a second to shuffle the data using
those ranges and finally write. When those jobs run, the second will reuse
the work done by the first job if there is available shuffle data with the
un-sorted rows. We often see jobs that don’t have shuffle data available
taking a lot longer because some expensive operation is run twice — either
reading the data from the table twice, or running an expensive UDF twice.
In that case, I recommend adding a round-robin repartition (
df.repartition(1000)) to add a shuffle after the expensive part of the
query, so that the second job can skip the expensive stages before that
point.

>From the description here about not having enough shuffle space, it sounds
like the first option makes the most sense if you have the temp space for
that operation. But it could be that the second option works because it
ensures that the data going into the shuffle is evenly distributed through
the cluster, which might help if you’re running out of space on a single
node. Also make sure that your parallelism is high enough that you're using
the entire cluster and not just a few nodes.

rb

On Fri, Sep 6, 2019 at 9:19 AM Xabriel Collazo Mojica
<xcoll...@adobe.com.invalid> wrote:

> Hi folks,
>
>
>
> We are now consistently hitting this problem:
>
>
>
> 1) We want to write a data set in the order of 1-5TB of data into a
> partitioned Iceberg table.
>
> 2) Iceberg, on a single write, will fail a partitioned writer if it
> detects the data is not sorted. This has been discussed before in this
> list, and as per Ryan, this behavior is there to avoid the small files
> problem.
>
> 3) Thus, you are forced to do a sort. Ryan recommends to ‘sort by all
> partitioning columns, plus a high cardinality column like an ID’ (See
> Iceberg Mail Dev thread “Timeout waiting for connection from pool (S3)“).
>
> 4) For small writes, this is no problem, but I have had many job failures
> by trying to sort and running out of temp space while shuffling. The worker
> nodes are well configured, with 800GB of temp space available.
>
> 5) When it does succeed, then I have the issue that the sort consistently
> makes writes take at least double the time.
>
>
>
> To summarize:
>
>
>
> Given the cost of sorting TBs of data, any tips on what to do in Iceberg
> to avoid it?
>
> Should we allow unsorted writes to go thru with a flag?
>
>
>
> Thanks,
>
> *Xabriel J Collazo Mojica*  |  Senior Software Engineer  |  Adobe  |
> xcoll...@adobe.com
>


-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to