Mayur,

Is it possible to connect to this process with a profiler and look at
what's taking up all of the space?

I suspect that what's happening here is that you're loading the list of
snapshots for each version of metadata, so you're holding a lot of copies
of the entire snapshot history and possibly caching the list of manifests
for some snapshots as well.

I've thought about adding a way to avoid parsing and loading snapshots,
probably by passing a cache when loading metadata so that all the copies of
a table can share snapshots in memory. That would work fine because they're
immutable. That might help you here, although a Snapshot instance will
cache manifests after loading them if they are accessed, so you'd want to
watch out for that as well.

The best step forward is to get an idea of what objects are taking up that
space with a profiler or heap dump if you can.

Ryan

On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <
mayur.srivast...@twosigma.com> wrote:

> Hi Iceberg Community,
>
>
>
> I’m running some experiments with high commit contention (on the same
> Iceberg table writing to different partitions) and I'm observing very high
> memory usage (5G to 7G). (Note that the data being written is very small.)
>
>
>
> *The scenario is described below:*
>
>
>
> *Note1: The catalog used is similar to the JDBC catalog.*
>
> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk to
> S3.*
>
> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main
> branch.*
>
>
>
> *Experiment params:*
>
> a. NT = 64 = number of parallel committers. Achieved using multiple
> threads within the same process.
>
> b. minWait = COMMIT_MIN_RETRY_WAIT_MS
>
> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
>
> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure
> enough retries are done so that all committers finish successfully).
>
>
>
> *Steps:*
>
>
>
> * *Create an Iceberg table *with three columns: time (timestamp without
> timezone), id (int32), value (float64). The partition spec is (time, MONTH).
>
> * Sequential step: create *NT different AppendFile* objects.
>
> * Sequential write step: for 1 to NT, *write 1 row* (in a unique month)
> and append the DataFile to the corresponding AppendFile. Basically, we
> create one parquet file per month (i.e. per partition) containing a single
> row. This is done to keep data size small for the experiment. Also, we
> ensure that each commit will contain a different partition.
>
> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a
> Runnable which calls *AppendFile.commit()*, and get the Future. I.e. Run
> the commits in parallel.
>
> * Wait for all Futures to finish.
>
> I ran this experiment with various values for params. For example, I
> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in
> (8, 16, 32, 64, 128). Code snippets can be found below.
>
>
>
> *Observations:*
>
> A. Total elapsed commit time increases with the number of committers which
> is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total
> elapsed commit time is more than 250 s. This is acceptable given the nature
> of OCC in high concurrency.
>
> B. The number of table metadata files is a multiple of the number of
> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
> metadata json files was 380. This is acceptable given the nature of OCC in
> high concurrency.
>
> *C. The memory usage which keeps shooting-up  periodically to 7G in some
> experiments. This is noticeable (i.e. memory usage > 1G) when number of
> concurrent committers >= 16 and becomes worse when number of committers
> increase. I’ve not investigated further but it could be that the in-memory
> metadata (snapshots, etc.) is growing very large. If I serialize the commit
> attempts (e.g. by acquiring a lock), the high memory usage problem goes
> away. But, I wanted to check here before trying out any alternative.*
>
>
>
> *Why is the concurrent commit important to us?*
>
> We have several users who use various processing engines to schedule their
> writes (into non-overlapping partitions) through a data service that takes
> care of writing and committing the data. In many cases, they end up in the
> high commit contention scenario as described above. My main worry here is
> that this is happening for a single table, if we have multiple tables being
> committed, the memory usage will be much larger.
>
>
>
> *Questions:  *
>
> 1.      Have others observed this behavior? Is the high memory usage
> expected or am I doing something wrong? Is there any way to reduce the
> memory footprint (e.g. by changing some metadata config) during the commit?
>
> 2.      What is the general recommendation for high concurrent
> committers? Is high concurrent committers an anti-pattern for Iceberg?
>
>
>
> Thanks,
>
> Mayur
>
> *Code snippets:*
>
>
>
> Schema schema = new Schema(
>
>     NestedField.of(1, false, "time", TimestampType.withoutZone()),
>
>     NestedField.of(2, false, "id", IntegerType.get()),
>
>     NestedField.of(3, false, "value", DoubleType.get())
>
> );
>
>
>
> catalog.createTable(
>
>     tableIdentifier,
>
>     schema,
>
>     PartitionSpec.builderFor(schema).month("time").build(),
>
>     ImmutableMap.of(
>
>         TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
>
>         TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
>
>         TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
>
>         "write.metadata.previous-versions-max", String.valueOf(1)
>
>     ) // properties
>
> );
>
>
>
> // Write data phase.
>
> List<AppendFiles> appendFilesList = new ArrayList<>();
>
> for (int m = 0; m < NT; m++) {
>
>     appendFilesList.add(table.newAppend());
>
> }
>
>
>
> for (int m = 0; m < NT; m++) {
>
>     LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
> ChronoUnit.MONTHS);
>
>     ImmutableList<GenericRecord> records =
> ImmutableList.of(createRecord(schema, time, 1, 10.0));
>
>     writeRecords(table,
> records).forEach(appendFilesList.get(m)::appendFile);
>
> }
>
>
>
> // Commit phase.
>
> // High memory usage starts from the commit phase.
>
> ExecutorService executors = Executors.newFixedThreadPool(NT);
>
> List<Future<?>> futures = new ArrayList<>();
>
> for (int m = 0; m < NT; m++) {
>
>     final int i = m;
>
>     futures.add(executors.submit(() -> {
>
>         appendFilesList.get(i).commit();
>
>     }));
>
> }
>
>
>
> for (int m = 0; m < N; m++) {
>
>     futures.get(m).get();
>
> }
>
>
>
> executors.shutdownNow();
>
>
>
> // snippet of writeRecords().
>
> private static List<DataFile> writeRecords(Table table,
> List<GenericRecord> records)
>
>         throws IOException {
>
>     // PartitionedWriterImpl extends extends PartitionedWriter<Record>
>
>     try (var writer = new PartitionedWriterImpl(table)) {
>
>         for (var record : records) {
>
>             writer.write(record);
>
>         }
>
>         return
> Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
>
>     }
>
> }
>
>
>
> Following is the heap usage for one of the experiments where we can see
> very high heap usage. The initial low usage part is data writes. The high
> heap usage starts with the commit phase.
>
>

-- 
Ryan Blue
Tabular

Reply via email to