Hi,


I've been running performance benchmarks on core Iceberg readers on Google 
Cloud Storage (GCS). I would like to share some of my results and check whether 
there are ways to improve performance on S3-like storage in general. The 
details (including sample code) are listed below the question section.


I've a few questions related to running Iceberg readers on S3-like storage:

1.      Are there published benchmarks for Iceberg on S3-like storage (GCS, 
Amazon S3, etc.)?

2.      Are there some configurations or code changes that improve performance 
(in terms of single reader latency and throughput) on S3-like storage?

3.      One of my first observations is that the Iceberg's single-process 
reader throughput on GCS is considerably low when compared to the local disk 
version. My initial guess is that the Iceberg read patterns are generating 
several requests to GCS; note that each request on GCS is considerably slower 
than that on the local disk. Has anyone experienced and looked into the reader 
slowness on S3-like storage before and could share some insights?

4.      In the benchmarks, I created a naive buffering strategy on top of GCS 
where I buffer the whole file from GCS before running the deserialization 
(hence reducing S3 calls to 1). Using this file system change, the performance 
improves again and is similar to the local disk version. Are there some 
existing buffering implementations for the GCS/S3-like file system that the 
Iceberg community has been using?

5.      In the past, we have implemented some optimizations for reading 
Apache/Arrow datasets from S3-like storage and contributed parts of it to the 
Apache/Arrow C++ project. The details are discussed here 
https://lists.apache.org/thread.html/r9c8d1081e02f479a09015c270364b6fb9e9fa7d9c0d4982f6afab531%40%3Cdev.arrow.apache.org%3E.
 Has there been any similar effort on the Iceberg project?


Benchmark data set

The benchmark data is a monthly partitioned time series table containing a 
timestamp, key, and n double columns, where n=50, 100, 200, 300. Each monthly 
partition contains 1 parquet file that has uniformly distributed 180,000 rows. 
The monthly parquet files are uncompressed and have following sizes (per 
parquet file or per partition) for n=50, 100, 200, 300 respectively: 69.3 MiB 
(per parquet file), 138.02 MiB, 275.46 MiB, 412.91 MiB. The data is written for 
1 year, i.e. 12 partitions. The values in the double columns are randomly 
distributed.


Benchmark experiments

We benchmarked a row-by-row reader and an Arrow reader that runs several kinds 
of queries on the Iceberg tables on local disk and GCS: read all, read 1 year, 
read 1 month, etc. Both readers use Iceberg core libraries and don't depend on 
any other framework such as Spark. One of the queries of interest is reading 
one month of data which effectively reads a single parquet file from a monthly 
partition. The Arrow reader is introduced in 
https://github.com/apache/iceberg/pull/2286 (still under review). In both 
readers, there is a warm up phase and then each benchmark query is repeated 3 
times. We compute the total bytes read by computing the size of Arrow vectors; 
this size is also used as a proxy for row-by-row reads. The total bytes read is 
close to the parquet file size on disk because the experiment was performed 
with no compression. The single reader throughput is computed using the total 
bytes read and the mean time taken to run the query.


The benchmark was run on a 16-cpu/58G google cloud machine.


Benchmark reader code


Row-by-row reader code:


Expression where = Expressions.and(

    Expressions.greaterThanOrEqual("time", timestampToMicros(begin)),

    Expressions.lessThan("time", timestampToMicros(endExclusive)));


ScanBuilder builder = IcebergGenerics.read(table).where(where);


try (CloseableIterable<Record> rowReader = builder.build()) {

    for (Record row : rowReader) {

    }

}


Arrow reader code (using https://github.com/apache/iceberg/pull/2286):


Expression where = Expressions.and(

    Expressions.greaterThanOrEqual("time", timestampToMicros(begin)),

    Expressions.lessThan("time", timestampToMicros(endExclusive)));


TableScan tableScan = table.newScan().filter(where);


try (var iterable = new VectorizedTableScanIterable(tableScan)) {

    for (ArrowBatch batch : iterable) {

        VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors();

    }

}


Selected results for query = read 1 month:


Query: read 1 month of data, i.e. read 1 parquet file from a single partition.


Result using Local Disk as storage:


In the first round, I ran the benchmark on the data stored on a local disk. The 
performance of row-by-row reader is ~100 MiB/s and Arrow reader is >300 MiB/s. 
The Arrow reader is significantly faster than the row-by-row reader.


Row-by-row reader:

n=50 read throughput=96.983 MiB/s

n=100 read throughput=98.709 MiB/s

n=200 read throughput=105.242 MiB/s

n=300 read throughput=85.681 MiB/s


Arrow reader:

n=50 read throughput=432.107 MiB/s

n=100 read throughput=396.876 MiB/s

n=200 read throughput=327.403 MiB/s

n=300 read throughput=325.542 MiB/s


Result using GCS as storage and GoogleHadoopFileSystem in hadoop configuration:


In the second round, I ran the benchmark on the data stored on GCS. The 
performance of row-by-row reader falls to 25 to 40 MiB/s and the Arrow reader 
falls to 35 to 60 MiB/s. This is a big drop in single reader throughput. It 
seems that the read patterns are generating several seeks and read requests on 
GCS.


Config code:

conf.set("fs.gs.impl", GoogleHadoopFileSystem.class.getName())


Row-by-row reader:

n=50 read throughput=37.515 MiB/s

n=100 read throughput=38.745 MiB/s

n=200 read throughput=32.282 MiB/s

n=300 read throughput=25.943 MiB/s


Arrow reader:

n=50 read throughput=55.272 MiB/s

n=100 read throughput=59.164 MiB/s

n=200 read throughput=51.068 MiB/s

n=300 read throughput=34.020 MiB/s


Result using GCS as storage and a naive buffering on top of 
GoogleHadoopFileSystem in hadoop configuration:


Config code (experimental/not committed):

conf.set("fs.gs.impl", InMemoryLoadingFileSystem.class.getName())


I implemented a naive buffering strategy on top of GoogleHadoopFileSystem which 
buffers the entire file (for these experiments) before deserializing the data. 
This change improves the throughput to almost the same level as the local disk.


Row-by-row reader:

n=50 read throughput=92.688 MiB/s

n=100 read throughput=100.278 MiB/s

n=200 read throughput=105.137 MiB/s

n=300 read throughput=106.432 MiB/s


Arrow reader:

n=50 read throughput=282.355 MiB/s

n=100 read throughput=264.336 MiB/s

n=200 read throughput=320.418 MiB/s

n=300 read throughput=338.855 MiB/s

Thanks,
Mayur

Reply via email to