Hi Ashish, You might try the approach that we took for the Flink writer. In Flink, we have multiple tasks writing data files. When a checkpoint completes, the data files are closed and a DataFile instance with all of the Iceberg metadata is sent to a committer task. Once all the DataFile instances arrive at the committer task, it will write a ManifestFile with them and save the manifest location in the checkpoint. After the checkpoint is persisted, the committer task runs an Iceberg commit to add the manifest to the table. (It also has checks to avoid committing twice if recovering from the checkpoint.)
It sounds like this would work for you because you can use Iceberg utilities to stage data for an eventual commit. You can also use snapshot ID inheritance (thanks Anton!) to avoid rewriting the manifest file, so the append commit would be a really quick operation. Since you're already accumulating data files to commit all at once, you're on the right track to getting this to work. And async compaction is a great way to maintain the data, too. rb On Wed, Jul 15, 2020 at 1:24 PM Ashish Mehta <mehta.ashis...@gmail.com> wrote: > Hi Team, > > We had a use case of ingesting a high frequency of small files into > Iceberg. I know based on the documentation I see that Iceberg is for > slow-moving data, but the kind of feature around reader/writer isolation, > etc Iceberg has, we really need it for our high-frequency small files (even > though we can delay their ingestion, it's in our control). > *The limitation we hit is, If we have a lot to incoming small files, then > all files are tried separately to commit to Iceberg, and we keep failing > because of Optimistic concurrency. * > > To get rid of this limitation, we have implemented an approach to convert > this high-frequency data to low frequency using the following > 1. *Buffering based on dynamic rules*: buffering input files, and apply > some rules i.e. based on the criticality of data i.e. for instance if the > data is critical (based on some field inside the data) and has to be > available downstream quickly, so they are committed to Iceberg right away > (after every 15 min window). > 2. *Eager Compaction: *Also as part of committing after buffering, we are > doing eager compaction, to optimize readers. > > This has worked well so far, but we know this has a pitfall of passing > through the data twice, and hence more $$ (which is not a problem, as we > anyhow need to compact data). Also as the frequency of data increases, the > eager compaction starts to take 20-30 minutes, and hence the critical data > doesn't make it to Iceberg for readers, and hence delaying everything for > us. > *We noticed that work [0], (Thanks Saisai) solves our approach for #2 i.e. > we can do async compaction, without having to waste another 20-30min on > critical data.* > > *Clarification: *Is there a way to buffer data on the Iceberg table, > without having to commit to the table's state (limitation with optimistic > concurrency)? This way, we would have the metadata (i.e. Manifest with > stats) for dataFiles available, and in the second run, its a matter of > accumulating these dataFiles together doing 1 commit with those buffered > dataFiles in it? This will help logically use the *Iceberg** writer* > based out of the target table, and generate buffered output, and achieve > the use case of high frequency flowing data. > IIUC we can't buffer the data outside of Iceberg, because I need to > generate stats for dataFiles as Iceberg does, and then committing it will > just be a metadata operation, compared to doing a second pass on data to > generate those stats again. > > I am quite hopeful that this can be hacked around on top of Iceberg, but > wanted to know the thoughts of community, and see the impediments there on > top of people's mind. Also if this might be a good addition to the > community itself. > > NOTE: WAP (write audit publish) does something similar but it does commit > to the table so it doesn't help up with limitation with > optimistic concurrency. > > [0]: https://github.com/apache/iceberg/pull/1083 > > Thanks, > Ashish > > > > -- Ryan Blue Software Engineer Netflix