Hi

Igor, does fs.gs.outputstream.upload.chunk.size affect the file size I can
upload?
Can i upload e.g. 1GB Parquet file, while also setting fs.gs.outputstream.
upload.chunk.size=8388608 (8MB / MiB)?

Best
PF


On Fri, Dec 3, 2021 at 5:33 PM Igor Dvorzhak <i...@google.com.invalid> wrote:

> No, right now this is a global property for the Hadoop FS instance. You
> either need to use different clients/Hadoop FS instances to write different
> files or switch to the direct upload mode (
> fs.gs.outputstream.direct.upload.enable=true), which could be better for
> your use case (in this write mode nothing cached in the memory and streamed
> to GCS directly, but it does not allow failed upload retries), depending on
> the parquet file sizes that you write.
>
> Also, you may want to test how critical 64MiB buffer size is for your
> application, it may be the case that 16MiB, for example, will get you
> desired performance for parquet file writes and good enough memory
> consumption.
>
> But on a broader note this seems to be one of the reasons why it could be
> good to have specialized Iceberg GcsFileIO, if Iceberg API allows, it can
> have separate write configuration optimized for metadata and data files.
>
> On Fri, Dec 3, 2021 at 6:24 AM Mayur Srivastava <
> mayur.srivast...@twosigma.com> wrote:
>
>> Thanks Igor. This may help mitigate the problem.
>>
>>
>>
>> But it looks like it applies to all files. We still want data (parquet)
>> files to allocate 64 MiB (seems reasonable). For metadata, a smaller size
>> is better. Is there a way to set the property based on file suffix or file
>> type?
>>
>>
>>
>> Thanks,
>>
>> Mayur
>>
>>
>>
>> *From:* Igor Dvorzhak <i...@google.com.INVALID>
>> *Sent:* Thursday, December 2, 2021 8:09 PM
>> *To:* dev@iceberg.apache.org
>> *Subject:* Re: High memory usage with highly concurrent committers
>>
>>
>>
>> For each written object GCS connector allocates ~64MiB of memory by
>> default to improve performance of large object writes. If you want to
>> reduce memory utilization in cases when you write many files at once you
>> just need to reduce upload chunk size to 8MiB, for example:
>> fs.gs.outputstream.upload.chunk.size=8388608
>>
>>
>>
>> On Wed, Dec 1, 2021 at 3:20 PM Mayur Srivastava <
>> mayur.srivast...@twosigma.com> wrote:
>>
>> That is correct Daniel.
>>
>>
>>
>> I’ve tried to explain our use of S3FileIO with GCS in the “Supporting
>> gs:// prefix …” thread.
>>
>>
>>
>> Thanks,
>>
>> Mayur
>>
>>
>>
>> *From:* Daniel Weeks <daniel.c.we...@gmail.com>
>> *Sent:* Wednesday, December 1, 2021 11:46 AM
>> *To:* Iceberg Dev List <dev@iceberg.apache.org>
>> *Subject:* Re: High memory usage with highly concurrent committers
>>
>>
>>
>> I feel like what Mayur was saying is that S3FileIO actually works with
>> GCS (it appears there is some S3 compatible API for GCS).
>>
>>
>>
>> If that is the case, then S3FileIO can be used natively against GCS,
>> which wouldn't require the ResolvingRileIO (just supporting the GCS URI
>> schemes).
>>
>>
>>
>> This is new to me and I haven't tested this, but Mayur, if this does
>> work, please share how you configured S3FileIO.
>>
>>
>>
>> -Dan
>>
>>
>>
>> On Wed, Dec 1, 2021 at 12:40 AM Jack Ye <yezhao...@gmail.com> wrote:
>>
>> We are in the process of supporting multiple file system schemes using
>> ResolvingFileIO, Ryan just added the initial implementation:
>> https://github.com/apache/iceberg/pull/3593
>>
>>
>>
>> -Jack
>>
>>
>>
>> On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava <
>> mayur.srivast...@twosigma.com> wrote:
>>
>> Thanks Ryan.
>>
>>
>>
>> I’m looking at the heapdump. At a preliminary look in jvisualvm, I see
>> the following top two objects:
>>
>> 1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total of
>> 3.2G in one of my tests). I checked some of the reference and find that
>> they are from
>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.
>>  MediaHttpUploader
>> <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references
>> coming from WriterBasedJsonGenerator, finalizer
>> (HadoopPositionOutputStream), etc as well. I’m not familiar with this code,
>> but is it possible that Hadoop output streams are not closed and close is
>> called the finalizers?
>>
>> 2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the
>> references.
>>
>>
>>
>> One interesting finding is that if I switch to the S3FileIO, the high
>> memory usage goes away and the memory usage is similar to the serialized
>> commits using a lock which is ~750 M for 128 parallel committers. And the
>> 750 M usage may fall-in line with the snapshots and manifest* objects.
>>
>>
>>
>> So, the high memory problem manifests only when using the default
>> HadoopFileSystem.
>>
>>
>>
>> Thanks, Mayur
>>
>>
>>
>> PS: I had to change S3FileIO locally to accept gs:// prefix so that it
>> works with GCS. Is there a plan to support gs:// prefix in the S3URI?
>>
>>
>>
>> *From:* Ryan Blue <b...@tabular.io>
>> *Sent:* Tuesday, November 30, 2021 3:53 PM
>> *To:* Iceberg Dev List <dev@iceberg.apache.org>
>> *Subject:* Re: High memory usage with highly concurrent committers
>>
>>
>>
>> Mayur,
>>
>>
>>
>> Is it possible to connect to this process with a profiler and look at
>> what's taking up all of the space?
>>
>>
>>
>> I suspect that what's happening here is that you're loading the list of
>> snapshots for each version of metadata, so you're holding a lot of copies
>> of the entire snapshot history and possibly caching the list of manifests
>> for some snapshots as well.
>>
>>
>>
>> I've thought about adding a way to avoid parsing and loading snapshots,
>> probably by passing a cache when loading metadata so that all the copies of
>> a table can share snapshots in memory. That would work fine because they're
>> immutable. That might help you here, although a Snapshot instance will
>> cache manifests after loading them if they are accessed, so you'd want to
>> watch out for that as well.
>>
>>
>>
>> The best step forward is to get an idea of what objects are taking up
>> that space with a profiler or heap dump if you can.
>>
>>
>>
>> Ryan
>>
>>
>>
>> On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <
>> mayur.srivast...@twosigma.com> wrote:
>>
>> Hi Iceberg Community,
>>
>>
>>
>> I’m running some experiments with high commit contention (on the same
>> Iceberg table writing to different partitions) and I'm observing very high
>> memory usage (5G to 7G). (Note that the data being written is very small.)
>>
>>
>>
>> *The scenario is described below:*
>>
>>
>>
>> *Note1: The catalog used is similar to the JDBC catalog.*
>>
>> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk to
>> S3.*
>>
>> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main
>> branch.*
>>
>>
>>
>> *Experiment params:*
>>
>> a. NT = 64 = number of parallel committers. Achieved using multiple
>> threads within the same process.
>>
>> b. minWait = COMMIT_MIN_RETRY_WAIT_MS
>>
>> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
>>
>> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure
>> enough retries are done so that all committers finish successfully).
>>
>>
>>
>> *Steps:*
>>
>>
>>
>> * *Create an Iceberg table *with three columns: time (timestamp without
>> timezone), id (int32), value (float64). The partition spec is (time, MONTH).
>>
>> * Sequential step: create *NT different AppendFile* objects.
>>
>> * Sequential write step: for 1 to NT, *write 1 row* (in a unique month)
>> and append the DataFile to the corresponding AppendFile. Basically, we
>> create one parquet file per month (i.e. per partition) containing a single
>> row. This is done to keep data size small for the experiment. Also, we
>> ensure that each commit will contain a different partition.
>>
>> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a
>> Runnable which calls *AppendFile.commit()*, and get the Future. I.e. Run
>> the commits in parallel.
>>
>> * Wait for all Futures to finish.
>>
>> I ran this experiment with various values for params. For example, I
>> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in
>> (8, 16, 32, 64, 128). Code snippets can be found below.
>>
>>
>>
>> *Observations:*
>>
>> A. Total elapsed commit time increases with the number of committers
>> which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total
>> elapsed commit time is more than 250 s. This is acceptable given the nature
>> of OCC in high concurrency.
>>
>> B. The number of table metadata files is a multiple of the number of
>> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
>> metadata json files was 380. This is acceptable given the nature of OCC in
>> high concurrency.
>>
>> *C. The memory usage which keeps shooting-up  periodically to 7G in some
>> experiments. This is noticeable (i.e. memory usage > 1G) when number of
>> concurrent committers >= 16 and becomes worse when number of committers
>> increase. I’ve not investigated further but it could be that the in-memory
>> metadata (snapshots, etc.) is growing very large. If I serialize the commit
>> attempts (e.g. by acquiring a lock), the high memory usage problem goes
>> away. But, I wanted to check here before trying out any alternative.*
>>
>>
>>
>> *Why is the concurrent commit important to us?*
>>
>> We have several users who use various processing engines to schedule
>> their writes (into non-overlapping partitions) through a data service that
>> takes care of writing and committing the data. In many cases, they end up
>> in the high commit contention scenario as described above. My main worry
>> here is that this is happening for a single table, if we have multiple
>> tables being committed, the memory usage will be much larger.
>>
>>
>>
>> *Questions:  *
>>
>> 1.      Have others observed this behavior? Is the high memory usage
>> expected or am I doing something wrong? Is there any way to reduce the
>> memory footprint (e.g. by changing some metadata config) during the commit?
>>
>> 2.      What is the general recommendation for high concurrent
>> committers? Is high concurrent committers an anti-pattern for Iceberg?
>>
>>
>>
>> Thanks,
>>
>> Mayur
>>
>> *Code snippets:*
>>
>>
>>
>> Schema schema = new Schema(
>>
>>     NestedField.of(1, false, "time", TimestampType.withoutZone()),
>>
>>     NestedField.of(2, false, "id", IntegerType.get()),
>>
>>     NestedField.of(3, false, "value", DoubleType.get())
>>
>> );
>>
>>
>>
>> catalog.createTable(
>>
>>     tableIdentifier,
>>
>>     schema,
>>
>>     PartitionSpec.builderFor(schema).month("time").build(),
>>
>>     ImmutableMap.of(
>>
>>         TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
>>
>>         TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait),
>>
>>         TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait),
>>
>>         "write.metadata.previous-versions-max", String.valueOf(1)
>>
>>     ) // properties
>>
>> );
>>
>>
>>
>> // Write data phase.
>>
>> List<AppendFiles> appendFilesList = new ArrayList<>();
>>
>> for (int m = 0; m < NT; m++) {
>>
>>     appendFilesList.add(table.newAppend());
>>
>> }
>>
>>
>>
>> for (int m = 0; m < NT; m++) {
>>
>>     LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
>> ChronoUnit.MONTHS);
>>
>>     ImmutableList<GenericRecord> records =
>> ImmutableList.of(createRecord(schema, time, 1, 10.0));
>>
>>     writeRecords(table,
>> records).forEach(appendFilesList.get(m)::appendFile);
>>
>> }
>>
>>
>>
>> // Commit phase.
>>
>> // High memory usage starts from the commit phase.
>>
>> ExecutorService executors = Executors.newFixedThreadPool(NT);
>>
>> List<Future<?>> futures = new ArrayList<>();
>>
>> for (int m = 0; m < NT; m++) {
>>
>>     final int i = m;
>>
>>     futures.add(executors.submit(() -> {
>>
>>         appendFilesList.get(i).commit();
>>
>>     }));
>>
>> }
>>
>>
>>
>> for (int m = 0; m < N; m++) {
>>
>>     futures.get(m).get();
>>
>> }
>>
>>
>>
>> executors.shutdownNow();
>>
>>
>>
>> // snippet of writeRecords().
>>
>> private static List<DataFile> writeRecords(Table table,
>> List<GenericRecord> records)
>>
>>         throws IOException {
>>
>>     // PartitionedWriterImpl extends extends PartitionedWriter<Record>
>>
>>     try (var writer = new PartitionedWriterImpl(table)) {
>>
>>         for (var record : records) {
>>
>>             writer.write(record);
>>
>>         }
>>
>>         return
>> Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
>>
>>     }
>>
>> }
>>
>>
>>
>> Following is the heap usage for one of the experiments where we can see
>> very high heap usage. The initial low usage part is data writes. The high
>> heap usage starts with the commit phase.
>>
>>
>>
>>
>> --
>>
>> Ryan Blue
>>
>> Tabular
>>
>>

Reply via email to