Igor, you might want to take a look at the recently added GCSFileIO: https://github.com/apache/iceberg/pull/3711/files
It would be great to get some additional feedback on that! On Tue, Dec 14, 2021 at 8:28 PM Igor Dvorzhak <i...@google.com.invalid> wrote: > Yes, you still can upload files of any size, this property just configures > the amount of data that first cached in-memory and subsequently sent to GCS > in one HTTP request during resumable upload > <https://cloud.google.com/storage/docs/resumable-uploads> session. > > On Mon, Dec 6, 2021 at 4:50 AM Piotr Findeisen <pi...@starburstdata.com> > wrote: > >> Hi >> >> Igor, does fs.gs.outputstream.upload.chunk.size affect the file size I >> can upload? >> Can i upload e.g. 1GB Parquet file, while also setting >> fs.gs.outputstream.upload.chunk.size=8388608 (8MB / MiB)? >> >> Best >> PF >> >> >> On Fri, Dec 3, 2021 at 5:33 PM Igor Dvorzhak <i...@google.com.invalid> >> wrote: >> >>> No, right now this is a global property for the Hadoop FS instance. You >>> either need to use different clients/Hadoop FS instances to write different >>> files or switch to the direct upload mode ( >>> fs.gs.outputstream.direct.upload.enable=true), which could be better >>> for your use case (in this write mode nothing cached in the memory and >>> streamed to GCS directly, but it does not allow failed upload retries), >>> depending on the parquet file sizes that you write. >>> >>> Also, you may want to test how critical 64MiB buffer size is for your >>> application, it may be the case that 16MiB, for example, will get you >>> desired performance for parquet file writes and good enough memory >>> consumption. >>> >>> But on a broader note this seems to be one of the reasons why it could >>> be good to have specialized Iceberg GcsFileIO, if Iceberg API allows, it >>> can have separate write configuration optimized for metadata and data files. >>> >>> On Fri, Dec 3, 2021 at 6:24 AM Mayur Srivastava < >>> mayur.srivast...@twosigma.com> wrote: >>> >>>> Thanks Igor. This may help mitigate the problem. >>>> >>>> >>>> >>>> But it looks like it applies to all files. We still want data (parquet) >>>> files to allocate 64 MiB (seems reasonable). For metadata, a smaller size >>>> is better. Is there a way to set the property based on file suffix or file >>>> type? >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Mayur >>>> >>>> >>>> >>>> *From:* Igor Dvorzhak <i...@google.com.INVALID> >>>> *Sent:* Thursday, December 2, 2021 8:09 PM >>>> *To:* dev@iceberg.apache.org >>>> *Subject:* Re: High memory usage with highly concurrent committers >>>> >>>> >>>> >>>> For each written object GCS connector allocates ~64MiB of memory by >>>> default to improve performance of large object writes. If you want to >>>> reduce memory utilization in cases when you write many files at once you >>>> just need to reduce upload chunk size to 8MiB, for example: >>>> fs.gs.outputstream.upload.chunk.size=8388608 >>>> >>>> >>>> >>>> On Wed, Dec 1, 2021 at 3:20 PM Mayur Srivastava < >>>> mayur.srivast...@twosigma.com> wrote: >>>> >>>> That is correct Daniel. >>>> >>>> >>>> >>>> I’ve tried to explain our use of S3FileIO with GCS in the “Supporting >>>> gs:// prefix …” thread. >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Mayur >>>> >>>> >>>> >>>> *From:* Daniel Weeks <daniel.c.we...@gmail.com> >>>> *Sent:* Wednesday, December 1, 2021 11:46 AM >>>> *To:* Iceberg Dev List <dev@iceberg.apache.org> >>>> *Subject:* Re: High memory usage with highly concurrent committers >>>> >>>> >>>> >>>> I feel like what Mayur was saying is that S3FileIO actually works with >>>> GCS (it appears there is some S3 compatible API for GCS). >>>> >>>> >>>> >>>> If that is the case, then S3FileIO can be used natively against GCS, >>>> which wouldn't require the ResolvingRileIO (just supporting the GCS URI >>>> schemes). >>>> >>>> >>>> >>>> This is new to me and I haven't tested this, but Mayur, if this does >>>> work, please share how you configured S3FileIO. >>>> >>>> >>>> >>>> -Dan >>>> >>>> >>>> >>>> On Wed, Dec 1, 2021 at 12:40 AM Jack Ye <yezhao...@gmail.com> wrote: >>>> >>>> We are in the process of supporting multiple file system schemes using >>>> ResolvingFileIO, Ryan just added the initial implementation: >>>> https://github.com/apache/iceberg/pull/3593 >>>> >>>> >>>> >>>> -Jack >>>> >>>> >>>> >>>> On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava < >>>> mayur.srivast...@twosigma.com> wrote: >>>> >>>> Thanks Ryan. >>>> >>>> >>>> >>>> I’m looking at the heapdump. At a preliminary look in jvisualvm, I see >>>> the following top two objects: >>>> >>>> 1. ‘byte[]’ : 87% of memory usage, (>100k instances with a total >>>> of 3.2G in one of my tests). I checked some of the reference and find that >>>> they are from >>>> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. >>>> MediaHttpUploader >>>> <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references >>>> coming from WriterBasedJsonGenerator, finalizer >>>> (HadoopPositionOutputStream), etc as well. I’m not familiar with this code, >>>> but is it possible that Hadoop output streams are not closed and close is >>>> called the finalizers? >>>> >>>> 2. ‘int[]’ : 12% usage (7k instances), but I can’t expand the >>>> references. >>>> >>>> >>>> >>>> One interesting finding is that if I switch to the S3FileIO, the high >>>> memory usage goes away and the memory usage is similar to the serialized >>>> commits using a lock which is ~750 M for 128 parallel committers. And the >>>> 750 M usage may fall-in line with the snapshots and manifest* objects. >>>> >>>> >>>> >>>> So, the high memory problem manifests only when using the default >>>> HadoopFileSystem. >>>> >>>> >>>> >>>> Thanks, Mayur >>>> >>>> >>>> >>>> PS: I had to change S3FileIO locally to accept gs:// prefix so that it >>>> works with GCS. Is there a plan to support gs:// prefix in the S3URI? >>>> >>>> >>>> >>>> *From:* Ryan Blue <b...@tabular.io> >>>> *Sent:* Tuesday, November 30, 2021 3:53 PM >>>> *To:* Iceberg Dev List <dev@iceberg.apache.org> >>>> *Subject:* Re: High memory usage with highly concurrent committers >>>> >>>> >>>> >>>> Mayur, >>>> >>>> >>>> >>>> Is it possible to connect to this process with a profiler and look at >>>> what's taking up all of the space? >>>> >>>> >>>> >>>> I suspect that what's happening here is that you're loading the list of >>>> snapshots for each version of metadata, so you're holding a lot of copies >>>> of the entire snapshot history and possibly caching the list of manifests >>>> for some snapshots as well. >>>> >>>> >>>> >>>> I've thought about adding a way to avoid parsing and loading snapshots, >>>> probably by passing a cache when loading metadata so that all the copies of >>>> a table can share snapshots in memory. That would work fine because they're >>>> immutable. That might help you here, although a Snapshot instance will >>>> cache manifests after loading them if they are accessed, so you'd want to >>>> watch out for that as well. >>>> >>>> >>>> >>>> The best step forward is to get an idea of what objects are taking up >>>> that space with a profiler or heap dump if you can. >>>> >>>> >>>> >>>> Ryan >>>> >>>> >>>> >>>> On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava < >>>> mayur.srivast...@twosigma.com> wrote: >>>> >>>> Hi Iceberg Community, >>>> >>>> >>>> >>>> I’m running some experiments with high commit contention (on the same >>>> Iceberg table writing to different partitions) and I'm observing very high >>>> memory usage (5G to 7G). (Note that the data being written is very small.) >>>> >>>> >>>> >>>> *The scenario is described below:* >>>> >>>> >>>> >>>> *Note1: The catalog used is similar to the JDBC catalog.* >>>> >>>> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk >>>> to S3.* >>>> >>>> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main >>>> branch.* >>>> >>>> >>>> >>>> *Experiment params:* >>>> >>>> a. NT = 64 = number of parallel committers. Achieved using multiple >>>> threads within the same process. >>>> >>>> b. minWait = COMMIT_MIN_RETRY_WAIT_MS >>>> >>>> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS >>>> >>>> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure >>>> enough retries are done so that all committers finish successfully). >>>> >>>> >>>> >>>> *Steps:* >>>> >>>> >>>> >>>> * *Create an Iceberg table *with three columns: time (timestamp >>>> without timezone), id (int32), value (float64). The partition spec is >>>> (time, MONTH). >>>> >>>> * Sequential step: create *NT different AppendFile* objects. >>>> >>>> * Sequential write step: for 1 to NT, *write 1 row* (in a unique >>>> month) and append the DataFile to the corresponding AppendFile. Basically, >>>> we create one parquet file per month (i.e. per partition) containing a >>>> single row. This is done to keep data size small for the experiment. Also, >>>> we ensure that each commit will contain a different partition. >>>> >>>> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a >>>> Runnable which calls *AppendFile.commit()*, and get the Future. I.e. >>>> Run the commits in parallel. >>>> >>>> * Wait for all Futures to finish. >>>> >>>> I ran this experiment with various values for params. For example, I >>>> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in >>>> (8, 16, 32, 64, 128). Code snippets can be found below. >>>> >>>> >>>> >>>> *Observations:* >>>> >>>> A. Total elapsed commit time increases with the number of committers >>>> which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total >>>> elapsed commit time is more than 250 s. This is acceptable given the nature >>>> of OCC in high concurrency. >>>> >>>> B. The number of table metadata files is a multiple of the number of >>>> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table >>>> metadata json files was 380. This is acceptable given the nature of OCC in >>>> high concurrency. >>>> >>>> *C. The memory usage which keeps shooting-up periodically to 7G in >>>> some experiments. This is noticeable (i.e. memory usage > 1G) when number >>>> of concurrent committers >= 16 and becomes worse when number of committers >>>> increase. I’ve not investigated further but it could be that the in-memory >>>> metadata (snapshots, etc.) is growing very large. If I serialize the commit >>>> attempts (e.g. by acquiring a lock), the high memory usage problem goes >>>> away. But, I wanted to check here before trying out any alternative.* >>>> >>>> >>>> >>>> *Why is the concurrent commit important to us?* >>>> >>>> We have several users who use various processing engines to schedule >>>> their writes (into non-overlapping partitions) through a data service that >>>> takes care of writing and committing the data. In many cases, they end up >>>> in the high commit contention scenario as described above. My main worry >>>> here is that this is happening for a single table, if we have multiple >>>> tables being committed, the memory usage will be much larger. >>>> >>>> >>>> >>>> *Questions: * >>>> >>>> 1. Have others observed this behavior? Is the high memory usage >>>> expected or am I doing something wrong? Is there any way to reduce the >>>> memory footprint (e.g. by changing some metadata config) during the commit? >>>> >>>> 2. What is the general recommendation for high concurrent >>>> committers? Is high concurrent committers an anti-pattern for Iceberg? >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Mayur >>>> >>>> *Code snippets:* >>>> >>>> >>>> >>>> Schema schema = new Schema( >>>> >>>> NestedField.of(1, false, "time", TimestampType.withoutZone()), >>>> >>>> NestedField.of(2, false, "id", IntegerType.get()), >>>> >>>> NestedField.of(3, false, "value", DoubleType.get()) >>>> >>>> ); >>>> >>>> >>>> >>>> catalog.createTable( >>>> >>>> tableIdentifier, >>>> >>>> schema, >>>> >>>> PartitionSpec.builderFor(schema).month("time").build(), >>>> >>>> ImmutableMap.of( >>>> >>>> TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256), >>>> >>>> TableProperties.COMMIT_MIN_RETRY_WAIT_MS, >>>> String.valueOf(minWait), >>>> >>>> TableProperties.COMMIT_MAX_RETRY_WAIT_MS, >>>> String.valueOf(maxWait), >>>> >>>> "write.metadata.previous-versions-max", String.valueOf(1) >>>> >>>> ) // properties >>>> >>>> ); >>>> >>>> >>>> >>>> // Write data phase. >>>> >>>> List<AppendFiles> appendFilesList = new ArrayList<>(); >>>> >>>> for (int m = 0; m < NT; m++) { >>>> >>>> appendFilesList.add(table.newAppend()); >>>> >>>> } >>>> >>>> >>>> >>>> for (int m = 0; m < NT; m++) { >>>> >>>> LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m, >>>> ChronoUnit.MONTHS); >>>> >>>> ImmutableList<GenericRecord> records = >>>> ImmutableList.of(createRecord(schema, time, 1, 10.0)); >>>> >>>> writeRecords(table, >>>> records).forEach(appendFilesList.get(m)::appendFile); >>>> >>>> } >>>> >>>> >>>> >>>> // Commit phase. >>>> >>>> // High memory usage starts from the commit phase. >>>> >>>> ExecutorService executors = Executors.newFixedThreadPool(NT); >>>> >>>> List<Future<?>> futures = new ArrayList<>(); >>>> >>>> for (int m = 0; m < NT; m++) { >>>> >>>> final int i = m; >>>> >>>> futures.add(executors.submit(() -> { >>>> >>>> appendFilesList.get(i).commit(); >>>> >>>> })); >>>> >>>> } >>>> >>>> >>>> >>>> for (int m = 0; m < N; m++) { >>>> >>>> futures.get(m).get(); >>>> >>>> } >>>> >>>> >>>> >>>> executors.shutdownNow(); >>>> >>>> >>>> >>>> // snippet of writeRecords(). >>>> >>>> private static List<DataFile> writeRecords(Table table, >>>> List<GenericRecord> records) >>>> >>>> throws IOException { >>>> >>>> // PartitionedWriterImpl extends extends PartitionedWriter<Record> >>>> >>>> try (var writer = new PartitionedWriterImpl(table)) { >>>> >>>> for (var record : records) { >>>> >>>> writer.write(record); >>>> >>>> } >>>> >>>> return >>>> Arrays.stream(writer.dataFiles()).collect(Collectors.toList()); >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> >>>> Following is the heap usage for one of the experiments where we can see >>>> very high heap usage. The initial low usage part is data writes. The high >>>> heap usage starts with the commit phase. >>>> >>>> >>>> >>>> >>>> -- >>>> >>>> Ryan Blue >>>> >>>> Tabular >>>> >>>> -- Ryan Blue Tabular