Hi Igor, does fs.gs.outputstream.upload.chunk.size affect the file size I can upload? Can i upload e.g. 1GB Parquet file, while also setting fs.gs.outputstream. upload.chunk.size=8388608 (8MB / MiB)?
Best PF On Fri, Dec 3, 2021 at 5:33 PM Igor Dvorzhak <i...@google.com.invalid> wrote: > No, right now this is a global property for the Hadoop FS instance. You > either need to use different clients/Hadoop FS instances to write different > files or switch to the direct upload mode ( > fs.gs.outputstream.direct.upload.enable=true), which could be better for > your use case (in this write mode nothing cached in the memory and streamed > to GCS directly, but it does not allow failed upload retries), depending on > the parquet file sizes that you write. > > Also, you may want to test how critical 64MiB buffer size is for your > application, it may be the case that 16MiB, for example, will get you > desired performance for parquet file writes and good enough memory > consumption. > > But on a broader note this seems to be one of the reasons why it could be > good to have specialized Iceberg GcsFileIO, if Iceberg API allows, it can > have separate write configuration optimized for metadata and data files. > > On Fri, Dec 3, 2021 at 6:24 AM Mayur Srivastava < > mayur.srivast...@twosigma.com> wrote: > >> Thanks Igor. This may help mitigate the problem. >> >> >> >> But it looks like it applies to all files. We still want data (parquet) >> files to allocate 64 MiB (seems reasonable). For metadata, a smaller size >> is better. Is there a way to set the property based on file suffix or file >> type? >> >> >> >> Thanks, >> >> Mayur >> >> >> >> *From:* Igor Dvorzhak <i...@google.com.INVALID> >> *Sent:* Thursday, December 2, 2021 8:09 PM >> *To:* dev@iceberg.apache.org >> *Subject:* Re: High memory usage with highly concurrent committers >> >> >> >> For each written object GCS connector allocates ~64MiB of memory by >> default to improve performance of large object writes. If you want to >> reduce memory utilization in cases when you write many files at once you >> just need to reduce upload chunk size to 8MiB, for example: >> fs.gs.outputstream.upload.chunk.size=8388608 >> >> >> >> On Wed, Dec 1, 2021 at 3:20 PM Mayur Srivastava < >> mayur.srivast...@twosigma.com> wrote: >> >> That is correct Daniel. >> >> >> >> I’ve tried to explain our use of S3FileIO with GCS in the “Supporting >> gs:// prefix …” thread. >> >> >> >> Thanks, >> >> Mayur >> >> >> >> *From:* Daniel Weeks <daniel.c.we...@gmail.com> >> *Sent:* Wednesday, December 1, 2021 11:46 AM >> *To:* Iceberg Dev List <dev@iceberg.apache.org> >> *Subject:* Re: High memory usage with highly concurrent committers >> >> >> >> I feel like what Mayur was saying is that S3FileIO actually works with >> GCS (it appears there is some S3 compatible API for GCS). >> >> >> >> If that is the case, then S3FileIO can be used natively against GCS, >> which wouldn't require the ResolvingRileIO (just supporting the GCS URI >> schemes). >> >> >> >> This is new to me and I haven't tested this, but Mayur, if this does >> work, please share how you configured S3FileIO. >> >> >> >> -Dan >> >> >> >> On Wed, Dec 1, 2021 at 12:40 AM Jack Ye <yezhao...@gmail.com> wrote: >> >> We are in the process of supporting multiple file system schemes using >> ResolvingFileIO, Ryan just added the initial implementation: >> https://github.com/apache/iceberg/pull/3593 >> >> >> >> -Jack >> >> >> >> On Tue, Nov 30, 2021 at 6:41 PM Mayur Srivastava < >> mayur.srivast...@twosigma.com> wrote: >> >> Thanks Ryan. >> >> >> >> I’m looking at the heapdump. At a preliminary look in jvisualvm, I see >> the following top two objects: >> >> 1. ‘byte[]’ : 87% of memory usage, (>100k instances with a total of >> 3.2G in one of my tests). I checked some of the reference and find that >> they are from >> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media. >> MediaHttpUploader >> <- GoogleHadoopOutputStream <- AvroFileAppender. I also see references >> coming from WriterBasedJsonGenerator, finalizer >> (HadoopPositionOutputStream), etc as well. I’m not familiar with this code, >> but is it possible that Hadoop output streams are not closed and close is >> called the finalizers? >> >> 2. ‘int[]’ : 12% usage (7k instances), but I can’t expand the >> references. >> >> >> >> One interesting finding is that if I switch to the S3FileIO, the high >> memory usage goes away and the memory usage is similar to the serialized >> commits using a lock which is ~750 M for 128 parallel committers. And the >> 750 M usage may fall-in line with the snapshots and manifest* objects. >> >> >> >> So, the high memory problem manifests only when using the default >> HadoopFileSystem. >> >> >> >> Thanks, Mayur >> >> >> >> PS: I had to change S3FileIO locally to accept gs:// prefix so that it >> works with GCS. Is there a plan to support gs:// prefix in the S3URI? >> >> >> >> *From:* Ryan Blue <b...@tabular.io> >> *Sent:* Tuesday, November 30, 2021 3:53 PM >> *To:* Iceberg Dev List <dev@iceberg.apache.org> >> *Subject:* Re: High memory usage with highly concurrent committers >> >> >> >> Mayur, >> >> >> >> Is it possible to connect to this process with a profiler and look at >> what's taking up all of the space? >> >> >> >> I suspect that what's happening here is that you're loading the list of >> snapshots for each version of metadata, so you're holding a lot of copies >> of the entire snapshot history and possibly caching the list of manifests >> for some snapshots as well. >> >> >> >> I've thought about adding a way to avoid parsing and loading snapshots, >> probably by passing a cache when loading metadata so that all the copies of >> a table can share snapshots in memory. That would work fine because they're >> immutable. That might help you here, although a Snapshot instance will >> cache manifests after loading them if they are accessed, so you'd want to >> watch out for that as well. >> >> >> >> The best step forward is to get an idea of what objects are taking up >> that space with a profiler or heap dump if you can. >> >> >> >> Ryan >> >> >> >> On Tue, Nov 30, 2021 at 12:34 PM Mayur Srivastava < >> mayur.srivast...@twosigma.com> wrote: >> >> Hi Iceberg Community, >> >> >> >> I’m running some experiments with high commit contention (on the same >> Iceberg table writing to different partitions) and I'm observing very high >> memory usage (5G to 7G). (Note that the data being written is very small.) >> >> >> >> *The scenario is described below:* >> >> >> >> *Note1: The catalog used is similar to the JDBC catalog.* >> >> *Note2: The data is stored on S3 and HadoopFileSystem is used to talk to >> S3.* >> >> *Note3: Iceberg code is ~6 months old. I haven’t tried the latest main >> branch.* >> >> >> >> *Experiment params:* >> >> a. NT = 64 = number of parallel committers. Achieved using multiple >> threads within the same process. >> >> b. minWait = COMMIT_MIN_RETRY_WAIT_MS >> >> c. maxWait = COMMIT_MAX_RETRY_WAIT_MS >> >> d. numRetries = COMMIT_NUM_RETRIES = 256 (I increased this to make sure >> enough retries are done so that all committers finish successfully). >> >> >> >> *Steps:* >> >> >> >> * *Create an Iceberg table *with three columns: time (timestamp without >> timezone), id (int32), value (float64). The partition spec is (time, MONTH). >> >> * Sequential step: create *NT different AppendFile* objects. >> >> * Sequential write step: for 1 to NT, *write 1 row* (in a unique month) >> and append the DataFile to the corresponding AppendFile. Basically, we >> create one parquet file per month (i.e. per partition) containing a single >> row. This is done to keep data size small for the experiment. Also, we >> ensure that each commit will contain a different partition. >> >> * *Parallel commit step*: Create a ThreadPool of NT threads, submit a >> Runnable which calls *AppendFile.commit()*, and get the Future. I.e. Run >> the commits in parallel. >> >> * Wait for all Futures to finish. >> >> I ran this experiment with various values for params. For example, I >> varied minWait from 100 ms to 5 seconds, maxWait from 1 s to 60 s, NT in >> (8, 16, 32, 64, 128). Code snippets can be found below. >> >> >> >> *Observations:* >> >> A. Total elapsed commit time increases with the number of committers >> which is expected. e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total >> elapsed commit time is more than 250 s. This is acceptable given the nature >> of OCC in high concurrency. >> >> B. The number of table metadata files is a multiple of the number of >> committers, e.g. with NT=64, minWait=100 ms, maxWait=10 s, the total table >> metadata json files was 380. This is acceptable given the nature of OCC in >> high concurrency. >> >> *C. The memory usage which keeps shooting-up periodically to 7G in some >> experiments. This is noticeable (i.e. memory usage > 1G) when number of >> concurrent committers >= 16 and becomes worse when number of committers >> increase. I’ve not investigated further but it could be that the in-memory >> metadata (snapshots, etc.) is growing very large. If I serialize the commit >> attempts (e.g. by acquiring a lock), the high memory usage problem goes >> away. But, I wanted to check here before trying out any alternative.* >> >> >> >> *Why is the concurrent commit important to us?* >> >> We have several users who use various processing engines to schedule >> their writes (into non-overlapping partitions) through a data service that >> takes care of writing and committing the data. In many cases, they end up >> in the high commit contention scenario as described above. My main worry >> here is that this is happening for a single table, if we have multiple >> tables being committed, the memory usage will be much larger. >> >> >> >> *Questions: * >> >> 1. Have others observed this behavior? Is the high memory usage >> expected or am I doing something wrong? Is there any way to reduce the >> memory footprint (e.g. by changing some metadata config) during the commit? >> >> 2. What is the general recommendation for high concurrent >> committers? Is high concurrent committers an anti-pattern for Iceberg? >> >> >> >> Thanks, >> >> Mayur >> >> *Code snippets:* >> >> >> >> Schema schema = new Schema( >> >> NestedField.of(1, false, "time", TimestampType.withoutZone()), >> >> NestedField.of(2, false, "id", IntegerType.get()), >> >> NestedField.of(3, false, "value", DoubleType.get()) >> >> ); >> >> >> >> catalog.createTable( >> >> tableIdentifier, >> >> schema, >> >> PartitionSpec.builderFor(schema).month("time").build(), >> >> ImmutableMap.of( >> >> TableProperties.COMMIT_NUM_RETRIES, String.valueOf(256), >> >> TableProperties.COMMIT_MIN_RETRY_WAIT_MS, String.valueOf(minWait), >> >> TableProperties.COMMIT_MAX_RETRY_WAIT_MS, String.valueOf(maxWait), >> >> "write.metadata.previous-versions-max", String.valueOf(1) >> >> ) // properties >> >> ); >> >> >> >> // Write data phase. >> >> List<AppendFiles> appendFilesList = new ArrayList<>(); >> >> for (int m = 0; m < NT; m++) { >> >> appendFilesList.add(table.newAppend()); >> >> } >> >> >> >> for (int m = 0; m < NT; m++) { >> >> LocalDateTime time = LocalDateTime.of(2021, 1, 1, 0, 0).plus(m, >> ChronoUnit.MONTHS); >> >> ImmutableList<GenericRecord> records = >> ImmutableList.of(createRecord(schema, time, 1, 10.0)); >> >> writeRecords(table, >> records).forEach(appendFilesList.get(m)::appendFile); >> >> } >> >> >> >> // Commit phase. >> >> // High memory usage starts from the commit phase. >> >> ExecutorService executors = Executors.newFixedThreadPool(NT); >> >> List<Future<?>> futures = new ArrayList<>(); >> >> for (int m = 0; m < NT; m++) { >> >> final int i = m; >> >> futures.add(executors.submit(() -> { >> >> appendFilesList.get(i).commit(); >> >> })); >> >> } >> >> >> >> for (int m = 0; m < N; m++) { >> >> futures.get(m).get(); >> >> } >> >> >> >> executors.shutdownNow(); >> >> >> >> // snippet of writeRecords(). >> >> private static List<DataFile> writeRecords(Table table, >> List<GenericRecord> records) >> >> throws IOException { >> >> // PartitionedWriterImpl extends extends PartitionedWriter<Record> >> >> try (var writer = new PartitionedWriterImpl(table)) { >> >> for (var record : records) { >> >> writer.write(record); >> >> } >> >> return >> Arrays.stream(writer.dataFiles()).collect(Collectors.toList()); >> >> } >> >> } >> >> >> >> Following is the heap usage for one of the experiments where we can see >> very high heap usage. The initial low usage part is data writes. The high >> heap usage starts with the commit phase. >> >> >> >> >> -- >> >> Ryan Blue >> >> Tabular >> >>