Igor, you might want to take a look at the recently added GCSFileIO:
https://github.com/apache/iceberg/pull/3711/files

It would be great to get some additional feedback on that!

On Tue, Dec 14, 2021 at 8:28 PM Igor Dvorzhak <i...@google.com.invalid>
wrote:

> Yes, you still can upload files of any size, this property just configures
> the amount of data that first cached in-memory and subsequently sent to GCS
> in one HTTP request during resumable upload
> <https://cloud.google.com/storage/docs/resumable-uploads> session.
>
> On Mon, Dec 6, 2021 at 4:50 AM Piotr Findeisen <pi...@starburstdata.com>
> wrote:
>
>> Hi
>>
>> Igor, does fs.gs.outputstream.upload.chunk.size affect the file size I
>> can upload?
>> Can i upload e.g. 1GB Parquet file, while also setting
>> fs.gs.outputstream.upload.chunk.size=8388608 (8MB / MiB)?
>>
>> Best
>> PF
>>
>>
>> On Fri, Dec 3, 2021 at 5:33 PM Igor Dvorzhak <i...@google.com.invalid>
>> wrote:
>>
>>> No, right now this is a global property for the Hadoop FS instance. You
>>> either need to use different clients/Hadoop FS instances to write different
>>> files or switch to the direct upload mode (
>>> fs.gs.outputstream.direct.upload.enable=true), which could be better
>>> for your use case (in this write mode nothing cached in the memory and
>>> streamed to GCS directly, but it does not allow failed upload retries),
>>> depending on the parquet file sizes that you write.
>>>
>>> Also, you may want to test how critical 64MiB buffer size is for your
>>> application, it may be the case that 16MiB, for example, will get you
>>> desired performance for parquet file writes and good enough memory
>>> consumption.
>>>
>>> But on a broader note this seems to be one of the reasons why it could
>>> be good to have specialized Iceberg GcsFileIO, if Iceberg API allows, it
>>> can have separate write configuration optimized for metadata and data files.
>>>
>>> On Fri, Dec 3, 2021 at 6:24 AM Mayur Srivastava <
>>> mayur.srivast...@twosigma.com> wrote:
>>>
>>>> Thanks Igor. This may help mitigate the problem.
>>>>
>>>>
>>>>
>>>> But it looks like it applies to all files. We still want data (parquet)
>>>> files to allocate 64 MiB (seems reasonable). For metadata, a smaller size
>>>> is better. Is there a way to set the property based on file suffix or file
>>>> type?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Mayur
>>>>
>>>>
>>>>
>>>> *From:* Igor Dvorzhak <i...@google.com.INVALID>
>>>> *Sent:* Thursday, December 2, 2021 8:09 PM
>>>> *To:* dev@iceberg.apache.org
>>>> *Subject:* Re: High memory usage with highly concurrent committers
>>>>
>>>>
>>>>
>>>> For each written object GCS connector allocates ~64MiB of memory by
>>>> default to improve performance of large object writes. If you want to
>>>> reduce memory utilization in cases when you write many files at once you
>>>> just need to reduce upload chunk size to 8MiB, for example:
>>>> fs.gs.outputstream.upload.chunk.size=8388608
>>>>
>>>>
>>>>
>>>> On Wed, Dec 1, 2021 at 3:20 PM Mayur Srivastava <
>>>> mayur.srivast...@twosigma.com> wrote:
>>>>
>>>> That is correct Daniel.
>>>>
>>>>
>>>>
>>>> I’ve tried to explain our use of S3FileIO with GCS in the “Supporting
>>>> gs:// prefix …” thread.
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Mayur
>>>>
>>>>
>>>>
>>>> *From:* Daniel Weeks <daniel.c.we...@gmail.com>
>>>> *Sent:* Wednesday, December 1, 2021 11:46 AM
>>>> *To:* Iceberg Dev List <dev@iceberg.apache.org>
>>>> *Subject:* Re: High memory usage with highly concurrent committers
>>>>
>>>>
>>>>
>>>> I feel like what Mayur was saying is that S3FileIO actually works with
>>>> GCS (it appears there is some S3 compatible API for GCS).
>>>>
>>>>
>>>>
>>>> If that is the case, then S3FileIO can be used natively against GCS,
>>>> which wouldn't require the ResolvingRileIO (just supporting the GCS URI
>>>> schemes).
>>>>
>>>>
>>>>
>>>> This is new to me and I haven't tested this, but Mayur, if this does
>>>> work, please share how you configured S3FileIO.
>>>>
>>>>
>>>>
>>>> -Dan
>>>>
>>>>
>>>>
>>>> On Wed, Dec 1, 2021 at 12:40 AM Jack Ye <yezhao...@gmail.com> wrote:
>>>>
>>>> We are in the process of supporting multiple file system schemes using
>>>> ResolvingFileIO, Ryan just added the initial implementation:
>>>> https://github.com/apache/iceberg/pull/3593
>>>>
>>>>
>>>>
>>>> -Jack
>>>>
>>>>
>>>>
>>>> On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava <
>>>> mayur.srivast...@twosigma.com> wrote:
>>>>
>>>> Thanks Ryan.
>>>>
>>>>
>>>>
>>>> I’m looking at the heapdump. At a preliminary look in jvisualvm, I see
>>>> the following top two objects:
>>>>
>>>> 1.      ‘byte[]’ : 87% of memory usage, (>100k instances with a total
>>>> of 3.2G in one of my tests). I checked some of the reference and find that
>>>> they are from
>>>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.
>>>>  MediaHttpUploader
>>>> <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references
>>>> coming from WriterBasedJsonGenerator, finalizer
>>>> (HadoopPositionOutputStream), etc as well. I’m not familiar with this code,
>>>> but is it possible that Hadoop output streams are not closed and close is
>>>> called the finalizers?
>>>>
>>>> 2.      ‘int[]’ : 12% usage (7k instances), but I can’t expand the
>>>> references.
>>>>
>>>>
>>>>
>>>> One interesting finding is that if I switch to the S3FileIO, the high
>>>> memory usage goes away and the memory usage is similar to the serialized
>>>> commits using a lock which is ~750 M for 128 parallel committers. And the
>>>> 750 M usage may fall-in line with the snapshots and manifest* objects.
>>>>
>>>>
>>>>
>>>> So, the high memory problem manifests only when using the default
>>>> HadoopFileSystem.
>>>>
>>>>
>>>>
>>>> Thanks, Mayur
>>>>
>>>>
>>>>
>>>> PS: I had to change S3FileIO locally to accept gs:// prefix so that it
>>>> works with GCS. Is there a plan to support gs:// prefix in the S3URI?
>>>>
>>>>
>>>>
>>>> *From:* Ryan Blue <b...@tabular.io>
>>>> *Sent:* Tuesday, November 30, 2021 3:53 PM
>>>> *To:* Iceberg Dev List <dev@iceberg.apache.org>
>>>> *Subject:* Re: High memory usage with highly concurrent committers
>>>>
>>>>
>>>>
>>>> Mayur,
>>>>
>>>>
>>>>
>>>> Is it possible to connect to this process with a profiler and look at
>>>> what's taking up all of the space?
>>>>
>>>>
>>>>
>>>> I suspect that what's happening here is that you're loading the list of
>>>> snapshots for each version of metadata, so you're holding a lot of copies
>>>> of the entire snapshot history and possibly caching the list of manifests
>>>> for some snapshots as well.
>>>>
>>>>
>>>>
>>>> I've thought about adding a way to avoid parsing and loading snapshots,
>>>> probably by passing a cache when loading metadata so that all the copies of
>>>> a table can share snapshots in memory. That would work fine because they're
>>>> immutable. That might help you here, although a Snapshot instance will
>>>> cache manifests after loading them if they are accessed, so you'd want to
>>>> watch out for that as well.
>>>>
>>>>
>>>>
>>>> The best step forward is to get an idea of what objects are taking up
>>>> that space with a profiler or heap dump if you can.
>>>>
>>>>
>>>>
>>>> Ryan
>>>>
>>>>
>>>>
>>>> On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava <
>>>> mayur.srivast...@twosigma.com> wrote:
>>>>
>>>> Hi Iceberg Community,
>>>>
>>>>
>>>>
>>>> I’m running some experiments with high commit contention (on the same
>>>> Iceberg table writing to different partitions) and I'm observing very high
>>>> memory usage (5G to 7G). (Note that the data being written is very small.)
>>>>
>>>>
>>>>
>>>> *The scenario is described below:*
>>>>
>>>>
>>>>
>>>> *Note1: The catalog used is similar to the JDBC catalog.*
>>>>
>>>> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk
>>>> to S3.*
>>>>
>>>> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main
>>>> branch.*
>>>>
>>>>
>>>>
>>>> *Experiment params:*
>>>>
>>>> a. NT = 64 = number of parallel committers. Achieved using multiple
>>>> threads within the same process.
>>>>
>>>> b. minWait = COMMIT_MIN_RETRY_WAIT_MS
>>>>
>>>> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS
>>>>
>>>> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure
>>>> enough retries are done so that all committers finish successfully).
>>>>
>>>>
>>>>
>>>> *Steps:*
>>>>
>>>>
>>>>
>>>> * *Create an Iceberg table *with three columns: time (timestamp
>>>> without timezone), id (int32), value (float64). The partition spec is
>>>> (time, MONTH).
>>>>
>>>> * Sequential step: create *NT different AppendFile* objects.
>>>>
>>>> * Sequential write step: for 1 to NT, *write 1 row* (in a unique
>>>> month) and append the DataFile to the corresponding AppendFile. Basically,
>>>> we create one parquet file per month (i.e. per partition) containing a
>>>> single row. This is done to keep data size small for the experiment. Also,
>>>> we ensure that each commit will contain a different partition.
>>>>
>>>> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a
>>>> Runnable which calls *AppendFile.commit()*, and get the Future. I.e.
>>>> Run the commits in parallel.
>>>>
>>>> * Wait for all Futures to finish.
>>>>
>>>> I ran this experiment with various values for params. For example, I
>>>> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in
>>>> (8, 16, 32, 64, 128). Code snippets can be found below.
>>>>
>>>>
>>>>
>>>> *Observations:*
>>>>
>>>> A. Total elapsed commit time increases with the number of committers
>>>> which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total
>>>> elapsed commit time is more than 250 s. This is acceptable given the nature
>>>> of OCC in high concurrency.
>>>>
>>>> B. The number of table metadata files is a multiple of the number of
>>>> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table
>>>> metadata json files was 380. This is acceptable given the nature of OCC in
>>>> high concurrency.
>>>>
>>>> *C. The memory usage which keeps shooting-up  periodically to 7G in
>>>> some experiments. This is noticeable (i.e. memory usage > 1G) when number
>>>> of concurrent committers >= 16 and becomes worse when number of committers
>>>> increase. I’ve not investigated further but it could be that the in-memory
>>>> metadata (snapshots, etc.) is growing very large. If I serialize the commit
>>>> attempts (e.g. by acquiring a lock), the high memory usage problem goes
>>>> away. But, I wanted to check here before trying out any alternative.*
>>>>
>>>>
>>>>
>>>> *Why is the concurrent commit important to us?*
>>>>
>>>> We have several users who use various processing engines to schedule
>>>> their writes (into non-overlapping partitions) through a data service that
>>>> takes care of writing and committing the data. In many cases, they end up
>>>> in the high commit contention scenario as described above. My main worry
>>>> here is that this is happening for a single table, if we have multiple
>>>> tables being committed, the memory usage will be much larger.
>>>>
>>>>
>>>>
>>>> *Questions:  *
>>>>
>>>> 1.      Have others observed this behavior? Is the high memory usage
>>>> expected or am I doing something wrong? Is there any way to reduce the
>>>> memory footprint (e.g. by changing some metadata config) during the commit?
>>>>
>>>> 2.      What is the general recommendation for high concurrent
>>>> committers? Is high concurrent committers an anti-pattern for Iceberg?
>>>>
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Mayur
>>>>
>>>> *Code snippets:*
>>>>
>>>>
>>>>
>>>> Schema schema = new Schema(
>>>>
>>>>     NestedField.of(1, false, "time", TimestampType.withoutZone()),
>>>>
>>>>     NestedField.of(2, false, "id", IntegerType.get()),
>>>>
>>>>     NestedField.of(3, false, "value", DoubleType.get())
>>>>
>>>> );
>>>>
>>>>
>>>>
>>>> catalog.createTable(
>>>>
>>>>     tableIdentifier,
>>>>
>>>>     schema,
>>>>
>>>>     PartitionSpec.builderFor(schema).month("time").build(),
>>>>
>>>>     ImmutableMap.of(
>>>>
>>>>         TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256),
>>>>
>>>>         TableProperties.COMMIT_MIN_RETRY_WAIT_MS,
>>>> String.valueOf(minWait),
>>>>
>>>>         TableProperties.COMMIT_MAX_RETRY_WAIT_MS,
>>>> String.valueOf(maxWait),
>>>>
>>>>         "write.metadata.previous-versions-max", String.valueOf(1)
>>>>
>>>>     ) // properties
>>>>
>>>> );
>>>>
>>>>
>>>>
>>>> // Write data phase.
>>>>
>>>> List<AppendFiles> appendFilesList = new ArrayList<>();
>>>>
>>>> for (int m = 0; m < NT; m++) {
>>>>
>>>>     appendFilesList.add(table.newAppend());
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> for (int m = 0; m < NT; m++) {
>>>>
>>>>     LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m,
>>>> ChronoUnit.MONTHS);
>>>>
>>>>     ImmutableList<GenericRecord> records =
>>>> ImmutableList.of(createRecord(schema, time, 1, 10.0));
>>>>
>>>>     writeRecords(table,
>>>> records).forEach(appendFilesList.get(m)::appendFile);
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> // Commit phase.
>>>>
>>>> // High memory usage starts from the commit phase.
>>>>
>>>> ExecutorService executors = Executors.newFixedThreadPool(NT);
>>>>
>>>> List<Future<?>> futures = new ArrayList<>();
>>>>
>>>> for (int m = 0; m < NT; m++) {
>>>>
>>>>     final int i = m;
>>>>
>>>>     futures.add(executors.submit(() -> {
>>>>
>>>>         appendFilesList.get(i).commit();
>>>>
>>>>     }));
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> for (int m = 0; m < N; m++) {
>>>>
>>>>     futures.get(m).get();
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> executors.shutdownNow();
>>>>
>>>>
>>>>
>>>> // snippet of writeRecords().
>>>>
>>>> private static List<DataFile> writeRecords(Table table,
>>>> List<GenericRecord> records)
>>>>
>>>>         throws IOException {
>>>>
>>>>     // PartitionedWriterImpl extends extends PartitionedWriter<Record>
>>>>
>>>>     try (var writer = new PartitionedWriterImpl(table)) {
>>>>
>>>>         for (var record : records) {
>>>>
>>>>             writer.write(record);
>>>>
>>>>         }
>>>>
>>>>         return
>>>> Arrays.stream(writer.dataFiles()).collect(Collectors.toList());
>>>>
>>>>     }
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>> Following is the heap usage for one of the experiments where we can see
>>>> very high heap usage. The initial low usage part is data writes. The high
>>>> heap usage starts with the commit phase.
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Ryan Blue
>>>>
>>>> Tabular
>>>>
>>>>

-- 
Ryan Blue
Tabular

Reply via email to