Hi Ashish, Here is my thinking:
IIUC, Spark Writer (Record writer) also buffer files as Iceberg dataFiles, for every micro-batch, Spark: - Closes DataFiles in the end (One task one file at least if task has records) - Collect them into Driver side, do a snapshot commit. So, you can choose the tradeoff between file size and latency (Also pay attention to the number of partitions): - Less small files: choose bigger micro-batch, but have higher latency. - Lower latency: choose litter micro-batch, but maybe have many small files. But the current writer is designed for incoming records. Instead of incoming files. - One easy way is reading files to records and writing records to Iceberg using Spark record writer. - Second way is writing a Spark file writer, you can decide which files should be merged or not and which files can be kept simply. And also committing files in the end of micro-batch like the Spark record writer doing. About how to convert parquet files to Iceberg dataFiles without rewriting records, you can take a look at `SparkTableUtil.listPartition`. Best, Jingsong On Thu, Jul 16, 2020 at 5:49 AM Ashish Mehta <mehta.ashis...@gmail.com> wrote: > Thanks Ryan! It sounds like exactly the problem we are hitting, but we are > entirely in Spark domain. i.e. each small files are validated using Spark > and are buffered right now as parquet table, we are looking to buffer them > as Iceberg dataFile, so that the commit operation doesn't have to read > those parquet tables for merging, instead just pick those dataFiles and > commit them on the target Iceberg table. > > Are there any utilities for staging those buffered files using Spark, on > any Iceberg table? It's important to write using Iceberg writer of the > target table, as the id mapping and all, should not break. > If not, then I would love to hear though around this, and can pick up the > implementation, with some pointers. > > Regards, > Ashish > > On Wed, Jul 15, 2020 at 2:06 PM Ryan Blue <rb...@netflix.com.invalid> > wrote: > >> Hi Ashish, >> >> You might try the approach that we took for the Flink writer. In Flink, >> we have multiple tasks writing data files. When a checkpoint completes, the >> data files are closed and a DataFile instance with all of the Iceberg >> metadata is sent to a committer task. Once all the DataFile instances >> arrive at the committer task, it will write a ManifestFile with them and >> save the manifest location in the checkpoint. After the checkpoint is >> persisted, the committer task runs an Iceberg commit to add the manifest to >> the table. (It also has checks to avoid committing twice if recovering from >> the checkpoint.) >> >> It sounds like this would work for you because you can use Iceberg >> utilities to stage data for an eventual commit. You can also use snapshot >> ID inheritance (thanks Anton!) to avoid rewriting the manifest file, so the >> append commit would be a really quick operation. Since you're already >> accumulating data files to commit all at once, you're on the right track to >> getting this to work. And async compaction is a great way to maintain the >> data, too. >> >> rb >> >> On Wed, Jul 15, 2020 at 1:24 PM Ashish Mehta <mehta.ashis...@gmail.com> >> wrote: >> >>> Hi Team, >>> >>> We had a use case of ingesting a high frequency of small files into >>> Iceberg. I know based on the documentation I see that Iceberg is for >>> slow-moving data, but the kind of feature around reader/writer isolation, >>> etc Iceberg has, we really need it for our high-frequency small files (even >>> though we can delay their ingestion, it's in our control). >>> *The limitation we hit is, If we have a lot to incoming small files, >>> then all files are tried separately to commit to Iceberg, and we keep >>> failing because of Optimistic concurrency. * >>> >>> To get rid of this limitation, we have implemented an approach to >>> convert this high-frequency data to low frequency using the following >>> 1. *Buffering based on dynamic rules*: buffering input files, and apply >>> some rules i.e. based on the criticality of data i.e. for instance if the >>> data is critical (based on some field inside the data) and has to be >>> available downstream quickly, so they are committed to Iceberg right away >>> (after every 15 min window). >>> 2. *Eager Compaction: *Also as part of committing after buffering, we >>> are doing eager compaction, to optimize readers. >>> >>> This has worked well so far, but we know this has a pitfall of passing >>> through the data twice, and hence more $$ (which is not a problem, as we >>> anyhow need to compact data). Also as the frequency of data increases, the >>> eager compaction starts to take 20-30 minutes, and hence the critical data >>> doesn't make it to Iceberg for readers, and hence delaying everything for >>> us. >>> *We noticed that work [0], (Thanks Saisai) solves our approach for #2 >>> i.e. we can do async compaction, without having to waste another 20-30min >>> on critical data.* >>> >>> *Clarification: *Is there a way to buffer data on the Iceberg table, >>> without having to commit to the table's state (limitation with optimistic >>> concurrency)? This way, we would have the metadata (i.e. Manifest with >>> stats) for dataFiles available, and in the second run, its a matter of >>> accumulating these dataFiles together doing 1 commit with those buffered >>> dataFiles in it? This will help logically use the *Iceberg** writer* >>> based out of the target table, and generate buffered output, and achieve >>> the use case of high frequency flowing data. >>> IIUC we can't buffer the data outside of Iceberg, because I need to >>> generate stats for dataFiles as Iceberg does, and then committing it will >>> just be a metadata operation, compared to doing a second pass on data to >>> generate those stats again. >>> >>> I am quite hopeful that this can be hacked around on top of Iceberg, but >>> wanted to know the thoughts of community, and see the impediments there on >>> top of people's mind. Also if this might be a good addition to the >>> community itself. >>> >>> NOTE: WAP (write audit publish) does something similar but it does >>> commit to the table so it doesn't help up with limitation with >>> optimistic concurrency. >>> >>> [0]: https://github.com/apache/iceberg/pull/1083 >>> >>> Thanks, >>> Ashish >>> >>> >>> >>> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > -- Best, Jingsong Lee