Hi Ashish,

Here is my thinking:

IIUC, Spark Writer (Record writer) also buffer files as Iceberg dataFiles,
for every micro-batch, Spark:
- Closes DataFiles in the end (One task one file at least if task has
records)
- Collect them into Driver side, do a snapshot commit.

So, you can choose the tradeoff between file size and latency (Also pay
attention to the number of partitions):
- Less small files: choose bigger micro-batch, but have higher latency.
- Lower latency: choose litter micro-batch, but maybe have many small files.

But the current writer is designed for incoming records. Instead of
incoming files.
- One easy way is reading files to records and writing records to Iceberg
using Spark record writer.
- Second way is writing a Spark file writer, you can decide which files
should be merged or not and which files can be kept simply. And also
committing files in the end of micro-batch like the Spark record writer
doing.

About how to convert parquet files to Iceberg dataFiles without rewriting
records, you can take a look at `SparkTableUtil.listPartition`.

Best,
Jingsong

On Thu, Jul 16, 2020 at 5:49 AM Ashish Mehta <mehta.ashis...@gmail.com>
wrote:

> Thanks Ryan! It sounds like exactly the problem we are hitting, but we are
> entirely in Spark domain. i.e. each small files are validated using Spark
> and are buffered right now as parquet table, we are looking to buffer them
> as Iceberg dataFile, so that the commit operation doesn't have to read
> those parquet tables for merging, instead just pick those dataFiles and
> commit them on the target Iceberg table.
>
> Are there any utilities for staging those buffered files using Spark, on
> any Iceberg table? It's important to write using Iceberg writer of the
> target table, as the id mapping and all, should not break.
> If not, then I would love to hear though around this, and can pick up the
> implementation, with some pointers.
>
> Regards,
> Ashish
>
> On Wed, Jul 15, 2020 at 2:06 PM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> Hi Ashish,
>>
>> You might try the approach that we took for the Flink writer. In Flink,
>> we have multiple tasks writing data files. When a checkpoint completes, the
>> data files are closed and a DataFile instance with all of the Iceberg
>> metadata is sent to a committer task. Once all the DataFile instances
>> arrive at the committer task, it will write a ManifestFile with them and
>> save the manifest location in the checkpoint. After the checkpoint is
>> persisted, the committer task runs an Iceberg commit to add the manifest to
>> the table. (It also has checks to avoid committing twice if recovering from
>> the checkpoint.)
>>
>> It sounds like this would work for you because you can use Iceberg
>> utilities to stage data for an eventual commit. You can also use snapshot
>> ID inheritance (thanks Anton!) to avoid rewriting the manifest file, so the
>> append commit would be a really quick operation. Since you're already
>> accumulating data files to commit all at once, you're on the right track to
>> getting this to work. And async compaction is a great way to maintain the
>> data, too.
>>
>> rb
>>
>> On Wed, Jul 15, 2020 at 1:24 PM Ashish Mehta <mehta.ashis...@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>> We had a use case of ingesting a high frequency of small files into
>>> Iceberg. I know based on the documentation I see that Iceberg is for
>>> slow-moving data, but the kind of feature around reader/writer isolation,
>>> etc Iceberg has, we really need it for our high-frequency small files (even
>>> though we can delay their ingestion, it's in our control).
>>> *The limitation we hit is, If we have a lot to incoming small files,
>>> then all files are tried separately to commit to Iceberg, and we keep
>>> failing because of Optimistic concurrency. *
>>>
>>> To get rid of this limitation, we have implemented an approach to
>>> convert this high-frequency data to low frequency using the following
>>> 1. *Buffering based on dynamic rules*: buffering input files, and apply
>>> some rules i.e. based on the criticality of data i.e. for instance if the
>>> data is critical (based on some field inside the data) and has to be
>>> available downstream quickly, so they are committed to Iceberg right away
>>> (after every 15 min window).
>>> 2. *Eager Compaction: *Also as part of committing after buffering, we
>>> are doing eager compaction, to optimize readers.
>>>
>>> This has worked well so far, but we know this has a pitfall of passing
>>> through the data twice, and hence more $$ (which is not a problem, as we
>>> anyhow need to compact data). Also as the frequency of data increases, the
>>> eager compaction starts to take 20-30 minutes, and hence the critical data
>>> doesn't make it to Iceberg for readers, and hence delaying everything for
>>> us.
>>> *We noticed that work [0], (Thanks Saisai) solves our approach for #2
>>> i.e. we can do async compaction, without having to waste another 20-30min
>>> on critical data.*
>>>
>>> *Clarification: *Is there a way to buffer data on the Iceberg table,
>>> without having to commit to the table's state (limitation with optimistic
>>> concurrency)? This way, we would have the metadata (i.e. Manifest with
>>> stats) for dataFiles available, and in the second run, its a matter of
>>> accumulating these dataFiles together doing 1 commit with those buffered
>>> dataFiles in it? This will help logically use the *Iceberg** writer*
>>> based out of the target table, and generate buffered output, and achieve
>>> the use case of high frequency flowing data.
>>> IIUC we can't buffer the data outside of Iceberg, because I need to
>>> generate stats for dataFiles as Iceberg does, and then committing it will
>>> just be a metadata operation, compared to doing a second pass on data to
>>> generate those stats again.
>>>
>>> I am quite hopeful that this can be hacked around on top of Iceberg, but
>>> wanted to know the thoughts of community, and see the impediments there on
>>> top of people's mind. Also if this might be a good addition to the
>>> community itself.
>>>
>>> NOTE: WAP (write audit publish) does something similar but it does
>>> commit to the table so it doesn't help up with limitation with
>>> optimistic concurrency.
>>>
>>> [0]: https://github.com/apache/iceberg/pull/1083
>>>
>>> Thanks,
>>> Ashish
>>>
>>>
>>>
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Best, Jingsong Lee

Reply via email to